You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/01 09:29:41 UTC

[GitHub] asfgit closed pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch

asfgit closed pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index f79dd93486e..0b7d8948f79 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -43,6 +43,7 @@ The following table list all available connectors and formats. Their mutual comp
 | Name              | Version       | Maven dependency             | SQL Client JAR         |
 | :---------------- | :------------ | :--------------------------- | :----------------------|
 | Filesystem        |               | Built-in                     | Built-in               |
+| Elasticsearch     | 6             | `flink-connector-elasticsearch6` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
 | Apache Kafka      | 0.8           | `flink-connector-kafka-0.8`  | Not available          |
 | Apache Kafka      | 0.9           | `flink-connector-kafka-0.9`  | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
 | Apache Kafka      | 0.10          | `flink-connector-kafka-0.10` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
@@ -588,6 +589,111 @@ Make sure to add the version-specific Kafka dependency. In addition, a correspon
 
 {% top %}
 
+### Elasticsearch Connector
+
+<span class="label label-primary">Sink: Streaming Append Mode</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+<span class="label label-info">Format: JSON-only</span>
+
+The Elasticsearch connector allows for writing into an index of the Elasticsearch search engine.
+
+The connector can operate in [upsert mode](#update-modes) for exchanging UPSERT/DELETE messages with the external system using a [key defined by the query](streaming.html#table-to-stream-conversion).
+
+For append-only queries, the connector can also operate in [append mode](#update-modes) for exchanging only INSERT messages with the external system. If no key is defined by the query, a key is automatically generated by Elasticsearch.
+
+The connector can be defined as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+.connect(
+  new Elasticsearch()
+    .version("6")                      // required: valid connector versions are "6"
+    .host("localhost", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
+    .index("MyUsers")                  // required: Elasticsearch index
+    .documentType("user")              // required: Elasticsearch document type
+
+    .keyDelimiter("$")        // optional: delimiter for composite keys ("_" by default)
+                              //   e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
+    .keyNullLiteral("n/a")    // optional: representation for null fields in keys ("null" by default)
+
+    // optional: failure handling strategy in case a request to Elasticsearch fails (fail by default)
+    .failureHandlerFail()          // optional: throws an exception if a request fails and causes a job failure
+    .failureHandlerIgnore()        //   or ignores failures and drops the request
+    .failureHandlerRetryRejected() //   or re-adds requests that have failed due to queue capacity saturation
+    .failureHandlerCustom(...)     //   or custom failure handling with a ActionRequestFailureHandler subclass
+
+    // optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
+    .disableFlushOnCheckpoint()    // optional: disables flushing on checkpoint (see notes below!)
+    .bulkFlushMaxActions(42)       // optional: maximum number of actions to buffer for each bulk request
+    .bulkFlushMaxSize("42 mb")     // optional: maximum size of buffered actions in bytes per bulk request
+                                   //   (only MB granularity is supported)
+    .bulkFlushInterval(60000L)     // optional: bulk flush interval (in milliseconds)
+
+    .bulkFlushBackoffConstant()    // optional: use a constant backoff type
+    .bulkFlushBackoffExponential() //   or use an exponential backoff type
+    .bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
+    .bulkFlushBackoffDelay(30000L) // optional: delay between each backoff attempt (in milliseconds)
+
+    // optional: connection properties to be used during REST communication to Elasticsearch
+    .connectionMaxRetryTimeout(3)  // optional: maximum timeout (in milliseconds) between retries
+    .connectionPathPrefix("/v1")   // optional: prefix string to be added to every REST communication
+)
+{% endhighlight %}
+</div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+connector:
+  type: elasticsearch
+  version: 6                # required: valid connector versions are "6"
+    hosts:                  # required: one or more Elasticsearch hosts to connect to
+      - hostname: "localhost"
+        port: 9200
+        protocol: "http"
+    index: "MyUsers"        # required: Elasticsearch index
+    document-type: "user"   # required: Elasticsearch document type
+
+    key-delimiter: "$"      # optional: delimiter for composite keys ("_" by default)
+                            #   e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
+    key-null-literal: "n/a" # optional: representation for null fields in keys ("null" by default)
+
+    # optional: failure handling strategy in case a request to Elasticsearch fails ("fail" by default)
+    failure-handler: ...    # valid strategies are "fail" (throws an exception if a request fails and
+                            #   thus causes a job failure), "ignore" (ignores failures and drops the request),
+                            #   "retry-rejected" (re-adds requests that have failed due to queue capacity
+                            #   saturation), or "custom" for failure handling with a
+                            #   ActionRequestFailureHandler subclass
+
+    # optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
+    flush-on-checkpoint: true   # optional: disables flushing on checkpoint (see notes below!) ("true" by default)
+    bulk-flush:
+      max-actions: 42           # optional: maximum number of actions to buffer for each bulk request
+      max-size: 42 mb           # optional: maximum size of buffered actions in bytes per bulk request
+                                #   (only MB granularity is supported)
+      interval: 60000           # optional: bulk flush interval (in milliseconds)
+      back-off:                 # optional: backoff strategy ("disabled" by default)
+        type: ...               #   valid strategis are "disabled", "constant", or "exponential"
+        max-retries: 3          # optional: maximum number of retries
+        delay: 30000            # optional: delay between each backoff attempt (in milliseconds)
+
+    # optional: connection properties to be used during REST communication to Elasticsearch
+    connection-max-retry-timeout: 3   # optional: maximum timeout (in milliseconds) between retries
+    connection-path-prefix: "/v1"     # optional: prefix string to be added to every REST communication
+{% endhighlight %}
+</div>
+</div>
+
+**Bulk flushing:** For more information about characteristics of the optional flushing parameters see the [corresponding low-level documentation]({{ site.baseurl }}/dev/connectors/elasticsearch.html).
+
+**Disabling flushing on checkpoint:** When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests.
+
+**Key extraction:** Flink automatically extracts valid keys from a query. For example, a query `SELECT a, b, c FROM t GROUP BY a, b` defines a composite key of the fields `a` and `b`. The Elasticsearch connector generates a document ID string for every row by concatenating all key fields in the order defined in the query using a key delimiter. A custom representation of null literals for key fields can be defined.
+
+<span class="label label-danger">Attention</span> A JSON format defines how to encode documents for the external system, therefore, it must be added as a [dependency](connect.html#formats). 
+
+{% top %}
+
 Table Formats
 -------------
 
diff --git a/flink-connectors/flink-connector-elasticsearch-base/pom.xml b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
index abb8fbfecdf..9fbd9b37c80 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
@@ -70,6 +70,16 @@ under the License.
 			</exclusions>
 		</dependency>
 
+		<!-- Used for the Elasticsearch table sink. -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project, won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
 		<!-- test dependencies -->
 
 		<dependency>
@@ -95,6 +105,23 @@ under the License.
 			<type>test-jar</type>
 		</dependency>
 
+		<!-- Elasticsearch table descriptor testing -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- Elasticsearch table sink factory testing -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-json</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 	<build>
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
new file mode 100644
index 00000000000..9bee8c38f28
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.typeutils.TypeCheckUtils;
+import org.apache.flink.table.util.TableConnectorUtil;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A version-agnostic Elasticsearch {@link UpsertStreamTableSink}.
+ */
+@Internal
+public abstract class ElasticsearchUpsertTableSinkBase implements UpsertStreamTableSink<Row> {
+
+	/** Flag that indicates that only inserts are accepted. */
+	private final boolean isAppendOnly;
+
+	/** Schema of the table. */
+	private final TableSchema schema;
+
+	/** Version-agnostic hosts configuration. */
+	private final List<Host> hosts;
+
+	/** Default index for all requests. */
+	private final String index;
+
+	/** Default document type for all requests. */
+	private final String docType;
+
+	/** Delimiter for composite keys. */
+	private final String keyDelimiter;
+
+	/** String literal for null keys. */
+	private final String keyNullLiteral;
+
+	/** Serialization schema used for the document. */
+	private final SerializationSchema<Row> serializationSchema;
+
+	/** Content type describing the serialization schema. */
+	private final XContentType contentType;
+
+	/** Failure handler for failing {@link ActionRequest}s. */
+	private final ActionRequestFailureHandler failureHandler;
+
+	/**
+	 * Map of optional configuration parameters for the Elasticsearch sink. The config is
+	 * internal and can change at any time.
+	 */
+	private final Map<SinkOption, String> sinkOptions;
+
+	/**
+	 * Version-agnostic creation of {@link ActionRequest}s.
+	 */
+	private final RequestFactory requestFactory;
+
+	/** Key field indices determined by the query. */
+	private int[] keyFieldIndices = new int[0];
+
+	public ElasticsearchUpsertTableSinkBase(
+			boolean isAppendOnly,
+			TableSchema schema,
+			List<Host> hosts,
+			String index,
+			String docType,
+			String keyDelimiter,
+			String keyNullLiteral,
+			SerializationSchema<Row> serializationSchema,
+			XContentType contentType,
+			ActionRequestFailureHandler failureHandler,
+			Map<SinkOption, String> sinkOptions,
+			RequestFactory requestFactory) {
+
+		this.isAppendOnly = isAppendOnly;
+		this.schema = Preconditions.checkNotNull(schema);
+		this.hosts = Preconditions.checkNotNull(hosts);
+		this.index = Preconditions.checkNotNull(index);
+		this.keyDelimiter = Preconditions.checkNotNull(keyDelimiter);
+		this.keyNullLiteral = Preconditions.checkNotNull(keyNullLiteral);
+		this.docType = Preconditions.checkNotNull(docType);
+		this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
+		this.contentType = Preconditions.checkNotNull(contentType);
+		this.failureHandler = Preconditions.checkNotNull(failureHandler);
+		this.sinkOptions = Preconditions.checkNotNull(sinkOptions);
+		this.requestFactory = Preconditions.checkNotNull(requestFactory);
+	}
+
+	@Override
+	public void setKeyFields(String[] keyNames) {
+		if (keyNames == null) {
+			this.keyFieldIndices = new int[0];
+			return;
+		}
+
+		final String[] fieldNames = getFieldNames();
+		final int[] keyFieldIndices = new int[keyNames.length];
+		for (int i = 0; i < keyNames.length; i++) {
+			keyFieldIndices[i] = -1;
+			for (int j = 0; j < fieldNames.length; j++) {
+				if (keyNames[i].equals(fieldNames[j])) {
+					keyFieldIndices[i] = j;
+					break;
+				}
+			}
+			if (keyFieldIndices[i] == -1) {
+				throw new RuntimeException("Invalid key fields: " + Arrays.toString(keyNames));
+			}
+		}
+
+		validateKeyTypes(keyFieldIndices);
+
+		this.keyFieldIndices = keyFieldIndices;
+	}
+
+	@Override
+	public void setIsAppendOnly(Boolean isAppendOnly) {
+		if (this.isAppendOnly && !isAppendOnly) {
+			throw new ValidationException(
+				"The given query is not supported by this sink because the sink is configured to " +
+				"operate in append mode only. Thus, it only support insertions (no queries " +
+				"with updating results).");
+		}
+	}
+
+	@Override
+	public TypeInformation<Row> getRecordType() {
+		return schema.toRowType();
+	}
+
+	@Override
+	public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+		final ElasticsearchUpsertSinkFunction upsertFunction =
+			new ElasticsearchUpsertSinkFunction(
+				index,
+				docType,
+				keyDelimiter,
+				keyNullLiteral,
+				serializationSchema,
+				contentType,
+				requestFactory,
+				keyFieldIndices);
+		final SinkFunction<Tuple2<Boolean, Row>> sinkFunction = createSinkFunction(
+			hosts,
+			failureHandler,
+			sinkOptions,
+			upsertFunction);
+		dataStream.addSink(sinkFunction)
+			.name(TableConnectorUtil.generateRuntimeName(this.getClass(), getFieldNames()));
+	}
+
+	@Override
+	public TypeInformation<Tuple2<Boolean, Row>> getOutputType() {
+		return Types.TUPLE(Types.BOOLEAN, getRecordType());
+	}
+
+	@Override
+	public String[] getFieldNames() {
+		return schema.getColumnNames();
+	}
+
+	@Override
+	public TypeInformation<?>[] getFieldTypes() {
+		return schema.getTypes();
+	}
+
+	@Override
+	public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {
+			throw new ValidationException("Reconfiguration with different fields is not allowed. " +
+				"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +
+				"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));
+		}
+		return copy(
+			isAppendOnly,
+			schema,
+			hosts,
+			index,
+			docType,
+			keyDelimiter,
+			keyNullLiteral,
+			serializationSchema,
+			contentType,
+			failureHandler,
+			sinkOptions,
+			requestFactory);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		ElasticsearchUpsertTableSinkBase that = (ElasticsearchUpsertTableSinkBase) o;
+		return Objects.equals(isAppendOnly, that.isAppendOnly) &&
+			Objects.equals(schema, that.schema) &&
+			Objects.equals(hosts, that.hosts) &&
+			Objects.equals(index, that.index) &&
+			Objects.equals(docType, that.docType) &&
+			Objects.equals(keyDelimiter, that.keyDelimiter) &&
+			Objects.equals(keyNullLiteral, that.keyNullLiteral) &&
+			Objects.equals(serializationSchema, that.serializationSchema) &&
+			Objects.equals(contentType, that.contentType) &&
+			Objects.equals(failureHandler, that.failureHandler) &&
+			Objects.equals(sinkOptions, that.sinkOptions);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(
+			isAppendOnly,
+			schema,
+			hosts,
+			index,
+			docType,
+			keyDelimiter,
+			keyNullLiteral,
+			serializationSchema,
+			contentType,
+			failureHandler,
+			sinkOptions);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// For version-specific implementations
+	// --------------------------------------------------------------------------------------------
+
+	protected abstract ElasticsearchUpsertTableSinkBase copy(
+		boolean isAppendOnly,
+		TableSchema schema,
+		List<Host> hosts,
+		String index,
+		String docType,
+		String keyDelimiter,
+		String keyNullLiteral,
+		SerializationSchema<Row> serializationSchema,
+		XContentType contentType,
+		ActionRequestFailureHandler failureHandler,
+		Map<SinkOption, String> sinkOptions,
+		RequestFactory requestFactory);
+
+	protected abstract SinkFunction<Tuple2<Boolean, Row>> createSinkFunction(
+		List<Host> hosts,
+		ActionRequestFailureHandler failureHandler,
+		Map<SinkOption, String> sinkOptions,
+		ElasticsearchUpsertSinkFunction upsertFunction);
+
+	// --------------------------------------------------------------------------------------------
+	// Helper methods
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Validate the types that are used for conversion to string.
+	 */
+	private void validateKeyTypes(int[] keyFieldIndices) {
+		final TypeInformation<?>[] types = getFieldTypes();
+		for (int keyFieldIndex : keyFieldIndices) {
+			final TypeInformation<?> type = types[keyFieldIndex];
+			if (!TypeCheckUtils.isSimpleStringRepresentation(type)) {
+				throw new ValidationException(
+					"Only simple types that can be safely converted into a string representation " +
+						"can be used as keys. But was: " + type);
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Keys for optional parameterization of the sink.
+	 */
+	public enum SinkOption {
+		DISABLE_FLUSH_ON_CHECKPOINT,
+		BULK_FLUSH_MAX_ACTIONS,
+		BULK_FLUSH_MAX_SIZE,
+		BULK_FLUSH_INTERVAL,
+		BULK_FLUSH_BACKOFF_ENABLED,
+		BULK_FLUSH_BACKOFF_TYPE,
+		BULK_FLUSH_BACKOFF_RETRIES,
+		BULK_FLUSH_BACKOFF_DELAY,
+		REST_MAX_RETRY_TIMEOUT,
+		REST_PATH_PREFIX
+	}
+
+	/**
+	 * Entity for describing a host of Elasticsearch.
+	 */
+	public static class Host {
+		public final String hostname;
+		public final int port;
+		public final String protocol;
+
+		public Host(String hostname, int port, String protocol) {
+			this.hostname = hostname;
+			this.port = port;
+			this.protocol = protocol;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			Host host = (Host) o;
+			return port == host.port &&
+				Objects.equals(hostname, host.hostname) &&
+				Objects.equals(protocol, host.protocol);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(
+				hostname,
+				port,
+				protocol);
+		}
+	}
+
+	/**
+	 * For version-agnostic creating of {@link ActionRequest}s.
+	 */
+	public interface RequestFactory extends Serializable {
+
+		/**
+		 * Creates an update request to be added to a {@link RequestIndexer}.
+		 */
+		UpdateRequest createUpdateRequest(
+			String index,
+			String docType,
+			String key,
+			XContentType contentType,
+			byte[] document);
+
+		/**
+		 * Creates an index request to be added to a {@link RequestIndexer}.
+		 */
+		IndexRequest createIndexRequest(
+			String index,
+			String docType,
+			XContentType contentType,
+			byte[] document);
+
+		/**
+		 * Creates a delete request to be added to a {@link RequestIndexer}.
+		 */
+		DeleteRequest createDeleteRequest(
+			String index,
+			String docType,
+			String key);
+	}
+
+	/**
+	 * Sink function for converting upserts into Elasticsearch {@link ActionRequest}s.
+	 */
+	public static class ElasticsearchUpsertSinkFunction implements ElasticsearchSinkFunction<Tuple2<Boolean, Row>> {
+
+		private final String index;
+		private final String docType;
+		private final String keyDelimiter;
+		private final String keyNullLiteral;
+		private final SerializationSchema<Row> serializationSchema;
+		private final XContentType contentType;
+		private final RequestFactory requestFactory;
+		private final int[] keyFieldIndices;
+
+		public ElasticsearchUpsertSinkFunction(
+				String index,
+				String docType,
+				String keyDelimiter,
+				String keyNullLiteral,
+				SerializationSchema<Row> serializationSchema,
+				XContentType contentType,
+				RequestFactory requestFactory,
+				int[] keyFieldIndices) {
+
+			this.index = Preconditions.checkNotNull(index);
+			this.docType = Preconditions.checkNotNull(docType);
+			this.keyDelimiter = Preconditions.checkNotNull(keyDelimiter);
+			this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
+			this.contentType = Preconditions.checkNotNull(contentType);
+			this.keyFieldIndices = Preconditions.checkNotNull(keyFieldIndices);
+			this.requestFactory = Preconditions.checkNotNull(requestFactory);
+			this.keyNullLiteral = Preconditions.checkNotNull(keyNullLiteral);
+		}
+
+		@Override
+		public void process(Tuple2<Boolean, Row> element, RuntimeContext ctx, RequestIndexer indexer) {
+			if (element.f0) {
+				processUpsert(element.f1, indexer);
+			} else {
+				processDelete(element.f1, indexer);
+			}
+		}
+
+		private void processUpsert(Row row, RequestIndexer indexer) {
+			final byte[] document = serializationSchema.serialize(row);
+			if (keyFieldIndices.length == 0) {
+				final IndexRequest indexRequest = requestFactory.createIndexRequest(
+					index,
+					docType,
+					contentType,
+					document);
+				indexer.add(indexRequest);
+			} else {
+				final String key = createKey(row);
+				final UpdateRequest updateRequest = requestFactory.createUpdateRequest(
+					index,
+					docType,
+					key,
+					contentType,
+					document);
+				indexer.add(updateRequest);
+			}
+		}
+
+		private void processDelete(Row row, RequestIndexer indexer) {
+			final String key = createKey(row);
+			final DeleteRequest deleteRequest = requestFactory.createDeleteRequest(
+				index,
+				docType,
+				key);
+			indexer.add(deleteRequest);
+		}
+
+		private String createKey(Row row) {
+			final StringBuilder builder = new StringBuilder();
+			for (int i = 0; i < keyFieldIndices.length; i++) {
+				final int keyFieldIndex = keyFieldIndices[i];
+				if (i > 0) {
+					builder.append(keyDelimiter);
+				}
+				final Object value = row.getField(keyFieldIndex);
+				if (value == null) {
+					builder.append(keyNullLiteral);
+				} else {
+					builder.append(value.toString());
+				}
+			}
+			return builder.toString();
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			ElasticsearchUpsertSinkFunction that = (ElasticsearchUpsertSinkFunction) o;
+			return Objects.equals(index, that.index) &&
+				Objects.equals(docType, that.docType) &&
+				Objects.equals(keyDelimiter, that.keyDelimiter) &&
+				Objects.equals(keyNullLiteral, that.keyNullLiteral) &&
+				Objects.equals(serializationSchema, that.serializationSchema) &&
+				contentType == that.contentType &&
+				Objects.equals(requestFactory, that.requestFactory) &&
+				Arrays.equals(keyFieldIndices, that.keyFieldIndices);
+		}
+
+		@Override
+		public int hashCode() {
+			int result = Objects.hash(
+				index,
+				docType,
+				keyDelimiter,
+				keyNullLiteral,
+				serializationSchema,
+				contentType,
+				requestFactory);
+			result = 31 * result + Arrays.hashCode(keyFieldIndices);
+			return result;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
new file mode 100644
index 00000000000..79946deb708
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.Host;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption;
+import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.ElasticsearchValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator;
+import org.apache.flink.table.factories.SerializationSchemaFactory;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_DELAY;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_MAX_RETRIES;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_TYPE;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_CONSTANT;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_DISABLED;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_EXPONENTIAL;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_INTERVAL;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_MAX_ACTIONS;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_MAX_SIZE;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_CONNECTION_PATH_PREFIX;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_DOCUMENT_TYPE;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_CLASS;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_VALUE_CUSTOM;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_VALUE_FAIL;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_VALUE_IGNORE;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_VALUE_RETRY;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_FLUSH_ON_CHECKPOINT;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_HOSTS;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_HOSTS_HOSTNAME;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_HOSTS_PORT;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_HOSTS_PROTOCOL;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_INDEX;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_KEY_DELIMITER;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_KEY_NULL_LITERAL;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_TYPE_VALUE_ELASTICSEARCH;
+import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
+import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
+import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
+
+/**
+ * Version-agnostic table factory for creating an {@link UpsertStreamTableSink} for Elasticsearch.
+ */
+@Internal
+public abstract class ElasticsearchUpsertTableSinkFactoryBase implements StreamTableSinkFactory<Tuple2<Boolean, Row>> {
+
+	private static final String SUPPORTED_FORMAT_TYPE = "json";
+	private static final XContentType SUPPORTED_CONTENT_TYPE = XContentType.JSON;
+	private static final String DEFAULT_KEY_DELIMITER = "_";
+	private static final String DEFAULT_KEY_NULL_LITERAL = "null";
+	private static final String DEFAULT_FAILURE_HANDLER = CONNECTOR_FAILURE_HANDLER_VALUE_FAIL;
+
+	@Override
+	public Map<String, String> requiredContext() {
+		final Map<String, String> context = new HashMap<>();
+		context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_ELASTICSEARCH);
+		context.put(CONNECTOR_VERSION(), elasticsearchVersion());
+		context.put(CONNECTOR_PROPERTY_VERSION(), "1");
+		return context;
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		final List<String> properties = new ArrayList<>();
+
+		// streaming properties
+		properties.add(UPDATE_MODE());
+
+		// Elasticsearch
+		properties.add(CONNECTOR_HOSTS + ".#." + CONNECTOR_HOSTS_HOSTNAME);
+		properties.add(CONNECTOR_HOSTS + ".#." + CONNECTOR_HOSTS_PORT);
+		properties.add(CONNECTOR_HOSTS + ".#." + CONNECTOR_HOSTS_PROTOCOL);
+		properties.add(CONNECTOR_INDEX);
+		properties.add(CONNECTOR_DOCUMENT_TYPE);
+		properties.add(CONNECTOR_KEY_DELIMITER);
+		properties.add(CONNECTOR_KEY_NULL_LITERAL);
+		properties.add(CONNECTOR_FAILURE_HANDLER);
+		properties.add(CONNECTOR_FAILURE_HANDLER_CLASS);
+		properties.add(CONNECTOR_FLUSH_ON_CHECKPOINT);
+		properties.add(CONNECTOR_BULK_FLUSH_MAX_ACTIONS);
+		properties.add(CONNECTOR_BULK_FLUSH_MAX_SIZE);
+		properties.add(CONNECTOR_BULK_FLUSH_INTERVAL);
+		properties.add(CONNECTOR_BULK_FLUSH_BACKOFF_TYPE);
+		properties.add(CONNECTOR_BULK_FLUSH_BACKOFF_MAX_RETRIES);
+		properties.add(CONNECTOR_BULK_FLUSH_BACKOFF_DELAY);
+		properties.add(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT);
+		properties.add(CONNECTOR_CONNECTION_PATH_PREFIX);
+
+		// schema
+		properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
+		properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
+
+		// format wildcard
+		properties.add(FORMAT() + ".*");
+
+		return properties;
+	}
+
+	@Override
+	public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties) {
+		final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+
+		return createElasticsearchUpsertTableSink(
+			descriptorProperties.isValue(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()),
+			descriptorProperties.getTableSchema(SCHEMA()),
+			getHosts(descriptorProperties),
+			descriptorProperties.getString(CONNECTOR_INDEX),
+			descriptorProperties.getString(CONNECTOR_DOCUMENT_TYPE),
+			descriptorProperties.getOptionalString(CONNECTOR_KEY_DELIMITER).orElse(DEFAULT_KEY_DELIMITER),
+			descriptorProperties.getOptionalString(CONNECTOR_KEY_NULL_LITERAL).orElse(DEFAULT_KEY_NULL_LITERAL),
+			getSerializationSchema(properties),
+			SUPPORTED_CONTENT_TYPE,
+			getFailureHandler(descriptorProperties),
+			getSinkOptions(descriptorProperties));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// For version-specific factories
+	// --------------------------------------------------------------------------------------------
+
+	protected abstract String elasticsearchVersion();
+
+	protected abstract ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
+		boolean isAppendOnly,
+		TableSchema schema,
+		List<Host> hosts,
+		String index,
+		String docType,
+		String keyDelimiter,
+		String keyNullLiteral,
+		SerializationSchema<Row> serializationSchema,
+		XContentType contentType,
+		ActionRequestFailureHandler failureHandler,
+		Map<SinkOption, String> sinkOptions);
+
+	// --------------------------------------------------------------------------------------------
+	// Helper methods
+	// --------------------------------------------------------------------------------------------
+
+	private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
+		final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+		descriptorProperties.putProperties(properties);
+
+		new StreamTableDescriptorValidator(true, false, true).validate(descriptorProperties);
+		new SchemaValidator(true, false, false).validate(descriptorProperties);
+		new ElasticsearchValidator().validate(descriptorProperties);
+
+		return descriptorProperties;
+	}
+
+	private List<Host> getHosts(DescriptorProperties descriptorProperties) {
+		final List<Map<String, String>> hosts = descriptorProperties.getFixedIndexedProperties(
+			CONNECTOR_HOSTS,
+			Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL));
+		return hosts.stream()
+			.map(host -> new Host(
+				descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)),
+				descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)),
+				descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL))))
+			.collect(Collectors.toList());
+	}
+
+	private SerializationSchema<Row> getSerializationSchema(Map<String, String> properties) {
+		final String formatType = properties.get(FORMAT_TYPE());
+		// we could have added this check to the table factory context
+		// but this approach allows to throw more helpful error messages
+		// if the supported format has not been added
+		if (formatType == null || !formatType.equals(SUPPORTED_FORMAT_TYPE)) {
+			throw new ValidationException(
+				"The Elasticsearch sink requires a '" + SUPPORTED_FORMAT_TYPE + "' format.");
+		}
+
+		@SuppressWarnings("unchecked")
+		final SerializationSchemaFactory<Row> formatFactory = TableFactoryService.find(
+			SerializationSchemaFactory.class,
+			properties,
+			this.getClass().getClassLoader());
+		return formatFactory.createSerializationSchema(properties);
+	}
+
+	private ActionRequestFailureHandler getFailureHandler(DescriptorProperties descriptorProperties) {
+		final String failureHandler = descriptorProperties
+			.getOptionalString(CONNECTOR_FAILURE_HANDLER)
+			.orElse(DEFAULT_FAILURE_HANDLER);
+		switch (failureHandler) {
+			case CONNECTOR_FAILURE_HANDLER_VALUE_FAIL:
+				return new NoOpFailureHandler();
+			case CONNECTOR_FAILURE_HANDLER_VALUE_IGNORE:
+				return new IgnoringFailureHandler();
+			case CONNECTOR_FAILURE_HANDLER_VALUE_RETRY:
+				return new RetryRejectedExecutionFailureHandler();
+			case CONNECTOR_FAILURE_HANDLER_VALUE_CUSTOM:
+				final Class<? extends ActionRequestFailureHandler> clazz = descriptorProperties
+					.getClass(CONNECTOR_FAILURE_HANDLER_CLASS, ActionRequestFailureHandler.class);
+				return InstantiationUtil.instantiate(clazz);
+			default:
+				throw new IllegalArgumentException("Unknown failure handler.");
+		}
+	}
+
+	private Map<SinkOption, String> getSinkOptions(DescriptorProperties descriptorProperties) {
+		final Map<SinkOption, String> options = new HashMap<>();
+
+		descriptorProperties.getOptionalBoolean(CONNECTOR_FLUSH_ON_CHECKPOINT)
+			.ifPresent(v -> options.put(SinkOption.DISABLE_FLUSH_ON_CHECKPOINT, String.valueOf(!v)));
+
+		mapSinkOption(descriptorProperties, options, CONNECTOR_BULK_FLUSH_MAX_ACTIONS, SinkOption.BULK_FLUSH_MAX_ACTIONS);
+		mapSinkOption(descriptorProperties, options, CONNECTOR_BULK_FLUSH_MAX_SIZE, SinkOption.BULK_FLUSH_MAX_SIZE);
+		mapSinkOption(descriptorProperties, options, CONNECTOR_BULK_FLUSH_INTERVAL, SinkOption.BULK_FLUSH_INTERVAL);
+
+		descriptorProperties.getOptionalString(CONNECTOR_BULK_FLUSH_BACKOFF_TYPE)
+			.ifPresent(v -> {
+				options.put(
+					SinkOption.BULK_FLUSH_BACKOFF_ENABLED,
+					String.valueOf(!v.equals(CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_DISABLED)));
+				switch (v) {
+					case CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_CONSTANT:
+						options.put(
+							SinkOption.BULK_FLUSH_BACKOFF_TYPE,
+							ElasticsearchSinkBase.FlushBackoffType.CONSTANT.toString());
+						break;
+					case CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_EXPONENTIAL:
+						options.put(
+							SinkOption.BULK_FLUSH_BACKOFF_TYPE,
+							ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL.toString());
+						break;
+					default:
+						throw new IllegalArgumentException("Unknown backoff type.");
+				}
+			});
+
+		mapSinkOption(descriptorProperties, options, CONNECTOR_BULK_FLUSH_BACKOFF_MAX_RETRIES, SinkOption.BULK_FLUSH_BACKOFF_RETRIES);
+		mapSinkOption(descriptorProperties, options, CONNECTOR_BULK_FLUSH_BACKOFF_DELAY, SinkOption.BULK_FLUSH_BACKOFF_DELAY);
+		mapSinkOption(descriptorProperties, options, CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, SinkOption.REST_MAX_RETRY_TIMEOUT);
+		mapSinkOption(descriptorProperties, options, CONNECTOR_CONNECTION_PATH_PREFIX, SinkOption.REST_PATH_PREFIX);
+
+		return options;
+	}
+
+	private void mapSinkOption(
+			DescriptorProperties descriptorProperties,
+			Map<SinkOption, String> options,
+			String fromKey,
+			SinkOption toKey) {
+		descriptorProperties.getOptionalString(fromKey).ifPresent(v -> options.put(toKey, v));
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/IgnoringFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/IgnoringFailureHandler.java
new file mode 100644
index 00000000000..a3644b45d2a
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/IgnoringFailureHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+
+/**
+ * Ignores all kinds of failures and drops the affected {@link ActionRequest}.
+ */
+@Internal
+public class IgnoringFailureHandler implements ActionRequestFailureHandler {
+
+	private static final long serialVersionUID = 1662846593501L;
+
+	@Override
+	public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) {
+		// ignore failure
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java
new file mode 100644
index 00000000000..f306e19282f
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.Host;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_DELAY;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_MAX_RETRIES;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_TYPE;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_INTERVAL;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_MAX_ACTIONS;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_MAX_SIZE;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_CONNECTION_PATH_PREFIX;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_DOCUMENT_TYPE;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_CLASS;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_FLUSH_ON_CHECKPOINT;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_HOSTS;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_HOSTS_HOSTNAME;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_HOSTS_PORT;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_HOSTS_PROTOCOL;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_INDEX;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_KEY_DELIMITER;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_KEY_NULL_LITERAL;
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_TYPE_VALUE_ELASTICSEARCH;
+
+/**
+ * Connector descriptor for the Elasticsearch search engine.
+ */
+public class Elasticsearch extends ConnectorDescriptor {
+
+	private DescriptorProperties internalProperties = new DescriptorProperties(true);
+	private List<Host> hosts = new ArrayList<>();
+
+	/**
+	 * Connector descriptor for the Elasticsearch search engine.
+	 */
+	public Elasticsearch() {
+		super(CONNECTOR_TYPE_VALUE_ELASTICSEARCH, 1, true);
+	}
+
+	/**
+	 * Sets the Elasticsearch version to be used. Required.
+	 *
+	 * @param version Elasticsearch version. E.g., "6".
+	 */
+	public Elasticsearch version(String version) {
+		internalProperties.putString(CONNECTOR_VERSION(), version);
+		return this;
+	}
+
+	/**
+	 * Adds an Elasticsearch host to connect to. Required.
+	 *
+	 * <p>Multiple hosts can be declared by calling this method multiple times.
+	 *
+	 * @param hostname connection hostname
+	 * @param port connection port
+	 * @param protocol connection protocol; e.g. "http"
+	 */
+	public Elasticsearch host(String hostname, int port, String protocol) {
+		final Host host =
+			new Host(
+				Preconditions.checkNotNull(hostname),
+				port,
+				Preconditions.checkNotNull(protocol));
+		hosts.add(host);
+		return this;
+	}
+
+	/**
+	 * Declares the Elasticsearch index for every record. Required.
+	 *
+	 * @param index Elasticsearch index
+	 */
+	public Elasticsearch index(String index) {
+		internalProperties.putString(CONNECTOR_INDEX, index);
+		return this;
+	}
+
+	/**
+	 * Declares the Elasticsearch document type for every record. Required.
+	 *
+	 * @param documentType Elasticsearch document type
+	 */
+	public Elasticsearch documentType(String documentType) {
+		internalProperties.putString(CONNECTOR_DOCUMENT_TYPE, documentType);
+		return this;
+	}
+
+	/**
+	 * Sets a custom key delimiter in case the Elasticsearch ID needs to be constructed from
+	 * multiple fields. Optional.
+	 *
+	 * @param keyDelimiter key delimiter; e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
+	 */
+	public Elasticsearch keyDelimiter(String keyDelimiter) {
+		internalProperties.putString(CONNECTOR_KEY_DELIMITER, keyDelimiter);
+		return this;
+	}
+
+	/**
+	 * Sets a custom representation for null fields in keys. Optional.
+	 *
+	 * @param keyNullLiteral key null literal string; e.g. "N/A" would result in IDs "KEY1_N/A_KEY3"
+	 */
+	public Elasticsearch keyNullLiteral(String keyNullLiteral) {
+		internalProperties.putString(CONNECTOR_KEY_NULL_LITERAL, keyNullLiteral);
+		return this;
+	}
+
+	/**
+	 * Configures a failure handling strategy in case a request to Elasticsearch fails.
+	 *
+	 * <p>This strategy throws an exception if a request fails and thus causes a job failure.
+	 */
+	public Elasticsearch failureHandlerFail() {
+		internalProperties.putString(CONNECTOR_FAILURE_HANDLER, ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_VALUE_FAIL);
+		return this;
+	}
+
+	/**
+	 * Configures a failure handling strategy in case a request to Elasticsearch fails.
+	 *
+	 * <p>This strategy ignores failures and drops the request.
+	 */
+	public Elasticsearch failureHandlerIgnore() {
+		internalProperties.putString(CONNECTOR_FAILURE_HANDLER, ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_VALUE_IGNORE);
+		return this;
+	}
+
+	/**
+	 * Configures a failure handling strategy in case a request to Elasticsearch fails.
+	 *
+	 * <p>This strategy re-adds requests that have failed due to queue capacity saturation.
+	 */
+	public Elasticsearch failureHandlerRetryRejected() {
+		internalProperties.putString(CONNECTOR_FAILURE_HANDLER, ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_VALUE_RETRY);
+		return this;
+	}
+
+	/**
+	 * Configures a failure handling strategy in case a request to Elasticsearch fails.
+	 *
+	 * <p>This strategy allows for custom failure handling using a {@link ActionRequestFailureHandler}.
+	 */
+	public Elasticsearch failureHandlerCustom(Class<? extends ActionRequestFailureHandler> failureHandlerClass) {
+		internalProperties.putString(CONNECTOR_FAILURE_HANDLER, ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_VALUE_CUSTOM);
+		internalProperties.putClass(CONNECTOR_FAILURE_HANDLER_CLASS, failureHandlerClass);
+		return this;
+	}
+
+	/**
+	 * Disables flushing on checkpoint. When disabled, a sink will not wait for all pending action
+	 * requests to be acknowledged by Elasticsearch on checkpoints.
+	 *
+	 * <p>Note: If flushing on checkpoint is disabled, a Elasticsearch sink does NOT
+	 * provide any strong guarantees for at-least-once delivery of action requests.
+	 */
+	public Elasticsearch disableFlushOnCheckpoint() {
+		internalProperties.putBoolean(CONNECTOR_FLUSH_ON_CHECKPOINT, false);
+		return this;
+	}
+
+	/**
+	 * Configures how to buffer elements before sending them in bulk to the cluster for efficiency.
+	 *
+	 * <p>Sets the maximum number of actions to buffer for each bulk request.
+	 *
+	 * @param maxActions the maximum number of actions to buffer per bulk request.
+	 */
+	public Elasticsearch bulkFlushMaxActions(int maxActions) {
+		internalProperties.putInt(CONNECTOR_BULK_FLUSH_MAX_ACTIONS, maxActions);
+		return this;
+	}
+
+	/**
+	 * Configures how to buffer elements before sending them in bulk to the cluster for efficiency.
+	 *
+	 * <p>Sets the maximum size of buffered actions per bulk request (using the syntax of {@link MemorySize}).
+	 */
+	public Elasticsearch bulkFlushMaxSize(String maxSize) {
+		internalProperties.putMemorySize(CONNECTOR_BULK_FLUSH_MAX_SIZE, MemorySize.parse(maxSize, MemorySize.MemoryUnit.BYTES));
+		return this;
+	}
+
+	/**
+	 * Configures how to buffer elements before sending them in bulk to the cluster for efficiency.
+	 *
+	 * <p>Sets the bulk flush interval (in milliseconds).
+	 *
+	 * @param interval bulk flush interval (in milliseconds).
+	 */
+	public Elasticsearch bulkFlushInterval(long interval) {
+		internalProperties.putLong(CONNECTOR_BULK_FLUSH_INTERVAL, interval);
+		return this;
+	}
+
+	/**
+	 * Configures how to buffer elements before sending them in bulk to the cluster for efficiency.
+	 *
+	 * <p>Sets a constant backoff type to use when flushing bulk requests.
+	 */
+	public Elasticsearch bulkFlushBackoffConstant() {
+		internalProperties.putString(
+			CONNECTOR_BULK_FLUSH_BACKOFF_TYPE,
+			ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_CONSTANT);
+		return this;
+	}
+
+	/**
+	 * Configures how to buffer elements before sending them in bulk to the cluster for efficiency.
+	 *
+	 * <p>Sets an exponential backoff type to use when flushing bulk requests.
+	 */
+	public Elasticsearch bulkFlushBackoffExponential() {
+		internalProperties.putString(
+			CONNECTOR_BULK_FLUSH_BACKOFF_TYPE,
+			ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_EXPONENTIAL);
+		return this;
+	}
+
+	/**
+	 * Configures how to buffer elements before sending them in bulk to the cluster for efficiency.
+	 *
+	 * <p>Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
+	 *
+	 * <p>Make sure to enable backoff by selecting a strategy ({@link #bulkFlushBackoffConstant()} or
+	 * {@link #bulkFlushBackoffExponential()}).
+	 *
+	 * @param maxRetries the maximum number of retries.
+	 */
+	public Elasticsearch bulkFlushBackoffMaxRetries(int maxRetries) {
+		internalProperties.putInt(CONNECTOR_BULK_FLUSH_BACKOFF_MAX_RETRIES, maxRetries);
+		return this;
+	}
+
+	/**
+	 * Configures how to buffer elements before sending them in bulk to the cluster for efficiency.
+	 *
+	 * <p>Sets the amount of delay between each backoff attempt when flushing bulk requests (in milliseconds).
+	 *
+	 * <p>Make sure to enable backoff by selecting a strategy ({@link #bulkFlushBackoffConstant()} or
+	 * {@link #bulkFlushBackoffExponential()}).
+	 *
+	 * @param delay delay between each backoff attempt (in milliseconds).
+	 */
+	public Elasticsearch bulkFlushBackoffDelay(long delay) {
+		internalProperties.putLong(CONNECTOR_BULK_FLUSH_BACKOFF_DELAY, delay);
+		return this;
+	}
+
+	/**
+	 * Sets connection properties to be used during REST communication to Elasticsearch.
+	 *
+	 * <p>Sets the maximum timeout (in milliseconds) in case of multiple retries of the same request.
+	 *
+	 * @param maxRetryTimeout maximum timeout (in milliseconds)
+	 */
+	public Elasticsearch connectionMaxRetryTimeout(int maxRetryTimeout) {
+		internalProperties.putInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, maxRetryTimeout);
+		return this;
+	}
+
+	/**
+	 * Sets connection properties to be used during REST communication to Elasticsearch.
+	 *
+	 * <p>Adds a path prefix to every REST communication.
+	 *
+	 * @param pathPrefix prefix string to be added to every REST communication
+	 */
+	public Elasticsearch connectionPathPrefix(String pathPrefix) {
+		internalProperties.putString(CONNECTOR_CONNECTION_PATH_PREFIX, pathPrefix);
+		return this;
+	}
+
+	@Override
+	public void addConnectorProperties(DescriptorProperties properties) {
+		properties.putProperties(internalProperties.asMap());
+
+		final List<List<String>> hostValues = hosts.stream()
+			.map(host -> Arrays.asList(host.hostname, String.valueOf(host.port), host.protocol))
+			.collect(Collectors.toList());
+		properties.putIndexedFixedProperties(
+			CONNECTOR_HOSTS,
+			Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL),
+			hostValues);
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
new file mode 100644
index 00000000000..9e04f2f798f
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * The validator for {@link Elasticsearch}.
+ */
+public class ElasticsearchValidator extends ConnectorDescriptorValidator {
+
+	public static final String CONNECTOR_TYPE_VALUE_ELASTICSEARCH = "elasticsearch";
+	public static final String CONNECTOR_VERSION_VALUE_6 = "6";
+	public static final String CONNECTOR_HOSTS = "connector.hosts";
+	public static final String CONNECTOR_HOSTS_HOSTNAME = "hostname";
+	public static final String CONNECTOR_HOSTS_PORT = "port";
+	public static final String CONNECTOR_HOSTS_PROTOCOL = "protocol";
+	public static final String CONNECTOR_INDEX = "connector.index";
+	public static final String CONNECTOR_DOCUMENT_TYPE = "connector.document-type";
+	public static final String CONNECTOR_KEY_DELIMITER = "connector.key-delimiter";
+	public static final String CONNECTOR_KEY_NULL_LITERAL = "connector.key-null-literal";
+	public static final String CONNECTOR_FAILURE_HANDLER = "connector.failure-handler";
+	public static final String CONNECTOR_FAILURE_HANDLER_VALUE_FAIL = "fail";
+	public static final String CONNECTOR_FAILURE_HANDLER_VALUE_IGNORE = "ignore";
+	public static final String CONNECTOR_FAILURE_HANDLER_VALUE_RETRY = "retry-rejected";
+	public static final String CONNECTOR_FAILURE_HANDLER_VALUE_CUSTOM = "custom";
+	public static final String CONNECTOR_FAILURE_HANDLER_CLASS = "connector.failure-handler-class";
+	public static final String CONNECTOR_FLUSH_ON_CHECKPOINT = "connector.flush-on-checkpoint";
+	public static final String CONNECTOR_BULK_FLUSH_MAX_ACTIONS = "connector.bulk-flush.max-actions";
+	public static final String CONNECTOR_BULK_FLUSH_MAX_SIZE = "connector.bulk-flush.max-size";
+	public static final String CONNECTOR_BULK_FLUSH_INTERVAL = "connector.bulk-flush.interval";
+	public static final String CONNECTOR_BULK_FLUSH_BACKOFF_TYPE = "connector.bulk-flush.backoff.type";
+	public static final String CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_DISABLED = "disabled";
+	public static final String CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_CONSTANT = "constant";
+	public static final String CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_EXPONENTIAL = "exponential";
+	public static final String CONNECTOR_BULK_FLUSH_BACKOFF_MAX_RETRIES = "connector.bulk-flush.backoff.max-retries";
+	public static final String CONNECTOR_BULK_FLUSH_BACKOFF_DELAY = "connector.bulk-flush.backoff.delay";
+	public static final String CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT = "connector.connection-max-retry-timeout";
+	public static final String CONNECTOR_CONNECTION_PATH_PREFIX = "connector.connection-path-prefix";
+
+	@Override
+	public void validate(DescriptorProperties properties) {
+		super.validate(properties);
+		properties.validateValue(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_ELASTICSEARCH, false);
+		validateVersion(properties);
+		validateHosts(properties);
+		validateGeneralProperties(properties);
+		validateFailureHandler(properties);
+		validateBulkFlush(properties);
+		validateConnectionProperties(properties);
+	}
+
+	private void validateVersion(DescriptorProperties properties) {
+		properties.validateEnumValues(
+			CONNECTOR_VERSION(),
+			false,
+			Collections.singletonList(CONNECTOR_VERSION_VALUE_6));
+	}
+
+	private void validateHosts(DescriptorProperties properties) {
+		final Map<String, Consumer<String>> hostsValidators = new HashMap<>();
+		hostsValidators.put(
+			CONNECTOR_HOSTS_HOSTNAME,
+			(prefix) -> properties.validateString(prefix + CONNECTOR_HOSTS_HOSTNAME, false, 1));
+		hostsValidators.put(
+			CONNECTOR_HOSTS_PORT,
+			(prefix) -> properties.validateInt(prefix + CONNECTOR_HOSTS_PORT, false, 0, 65535));
+		hostsValidators.put(CONNECTOR_HOSTS_PROTOCOL,
+			(prefix) -> properties.validateString(prefix + CONNECTOR_HOSTS_PROTOCOL, false, 1));
+		properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, false, hostsValidators);
+	}
+
+	private void validateGeneralProperties(DescriptorProperties properties) {
+		properties.validateString(CONNECTOR_INDEX, false, 1);
+		properties.validateString(CONNECTOR_DOCUMENT_TYPE, false, 1);
+		properties.validateString(CONNECTOR_KEY_DELIMITER, true);
+		properties.validateString(CONNECTOR_KEY_NULL_LITERAL, true);
+	}
+
+	private void validateFailureHandler(DescriptorProperties properties) {
+		final Map<String, Consumer<String>> failureHandlerValidators = new HashMap<>();
+		failureHandlerValidators.put(CONNECTOR_FAILURE_HANDLER_VALUE_FAIL, properties.noValidation());
+		failureHandlerValidators.put(CONNECTOR_FAILURE_HANDLER_VALUE_IGNORE, properties.noValidation());
+		failureHandlerValidators.put(CONNECTOR_FAILURE_HANDLER_VALUE_RETRY, properties.noValidation());
+		failureHandlerValidators.put(CONNECTOR_FAILURE_HANDLER_VALUE_CUSTOM,
+			prefix -> properties.validateString(CONNECTOR_FAILURE_HANDLER_CLASS, false, 1));
+		properties.validateEnum(CONNECTOR_FAILURE_HANDLER, true, failureHandlerValidators);
+	}
+
+	private void validateBulkFlush(DescriptorProperties properties) {
+		properties.validateBoolean(CONNECTOR_FLUSH_ON_CHECKPOINT, true);
+		properties.validateInt(CONNECTOR_BULK_FLUSH_MAX_ACTIONS, true, 1);
+		properties.validateMemorySize(CONNECTOR_BULK_FLUSH_MAX_SIZE, true, 1024 * 1024); // only allow MB precision
+		properties.validateLong(CONNECTOR_BULK_FLUSH_INTERVAL, true, 0);
+		properties.validateEnumValues(CONNECTOR_BULK_FLUSH_BACKOFF_TYPE,
+			true,
+			Arrays.asList(
+				CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_DISABLED,
+				CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_CONSTANT,
+				CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_EXPONENTIAL));
+		properties.validateInt(CONNECTOR_BULK_FLUSH_BACKOFF_MAX_RETRIES, true, 1);
+		properties.validateLong(CONNECTOR_BULK_FLUSH_BACKOFF_DELAY, true, 0);
+	}
+
+	private void validateConnectionProperties(DescriptorProperties properties) {
+		properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1);
+		properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true);
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryTestBase.java
new file mode 100644
index 00000000000..9376bfb4f42
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryTestBase.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.json.JsonRowSerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.Host;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Elasticsearch;
+import org.apache.flink.table.descriptors.Json;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.descriptors.TestTableDescriptor;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLogger;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Version-agnostic test base for {@link ElasticsearchUpsertTableSinkFactoryBase}.
+ */
+public abstract class ElasticsearchUpsertTableSinkFactoryTestBase extends TestLogger {
+
+	protected static final String HOSTNAME = "host1";
+	protected static final int PORT = 1234;
+	protected static final String SCHEMA = "https";
+	protected static final String INDEX = "MyIndex";
+	protected static final String DOC_TYPE = "MyType";
+	protected static final String KEY_DELIMITER = "#";
+	protected static final String KEY_NULL_LITERAL = "";
+
+	private static final String FIELD_KEY = "key";
+	private static final String FIELD_FRUIT_NAME = "fruit_name";
+	private static final String FIELD_COUNT = "count";
+	private static final String FIELD_TS = "ts";
+
+	@Test
+	public void testTableSink() {
+		// prepare parameters for Elasticsearch table sink
+
+		final TableSchema schema = createTestSchema();
+
+		final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
+			false,
+			schema,
+			Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
+			INDEX,
+			DOC_TYPE,
+			KEY_DELIMITER,
+			KEY_NULL_LITERAL,
+			new JsonRowSerializationSchema(schema.toRowType()),
+			XContentType.JSON,
+			new DummyFailureHandler(),
+			createTestSinkOptions());
+
+		// construct table sink using descriptors and table sink factory
+
+		final TestTableDescriptor testDesc = new TestTableDescriptor(
+				new Elasticsearch()
+					.version(getElasticsearchVersion())
+					.host(HOSTNAME, PORT, SCHEMA)
+					.index(INDEX)
+					.documentType(DOC_TYPE)
+					.keyDelimiter(KEY_DELIMITER)
+					.keyNullLiteral(KEY_NULL_LITERAL)
+					.bulkFlushBackoffExponential()
+					.bulkFlushBackoffDelay(123L)
+					.bulkFlushBackoffMaxRetries(3)
+					.bulkFlushInterval(100L)
+					.bulkFlushMaxActions(1000)
+					.bulkFlushMaxSize("1 MB")
+					.failureHandlerCustom(DummyFailureHandler.class)
+					.connectionMaxRetryTimeout(100)
+					.connectionPathPrefix("/myapp"))
+			.withFormat(
+				new Json()
+					.deriveSchema())
+			.withSchema(
+				new Schema()
+					.field(FIELD_KEY, Types.LONG())
+					.field(FIELD_FRUIT_NAME, Types.STRING())
+					.field(FIELD_COUNT, Types.DECIMAL())
+					.field(FIELD_TS, Types.SQL_TIMESTAMP()))
+			.inUpsertMode();
+
+		final Map<String, String> propertiesMap = DescriptorProperties.toJavaMap(testDesc);
+		final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
+			.createStreamTableSink(propertiesMap);
+
+		assertEquals(expectedSink, actualSink);
+	}
+
+	protected TableSchema createTestSchema() {
+		return TableSchema.builder()
+			.field(FIELD_KEY, Types.LONG())
+			.field(FIELD_FRUIT_NAME, Types.STRING())
+			.field(FIELD_COUNT, Types.DECIMAL())
+			.field(FIELD_TS, Types.SQL_TIMESTAMP())
+			.build();
+	}
+
+	protected Map<SinkOption, String> createTestSinkOptions() {
+		final Map<SinkOption, String> sinkOptions = new HashMap<>();
+		sinkOptions.put(SinkOption.BULK_FLUSH_BACKOFF_ENABLED, "true");
+		sinkOptions.put(SinkOption.BULK_FLUSH_BACKOFF_TYPE, "EXPONENTIAL");
+		sinkOptions.put(SinkOption.BULK_FLUSH_BACKOFF_DELAY, "123");
+		sinkOptions.put(SinkOption.BULK_FLUSH_BACKOFF_RETRIES, "3");
+		sinkOptions.put(SinkOption.BULK_FLUSH_INTERVAL, "100");
+		sinkOptions.put(SinkOption.BULK_FLUSH_MAX_ACTIONS, "1000");
+		sinkOptions.put(SinkOption.BULK_FLUSH_MAX_SIZE, "1048576 bytes");
+		sinkOptions.put(SinkOption.REST_MAX_RETRY_TIMEOUT, "100");
+		sinkOptions.put(SinkOption.REST_PATH_PREFIX, "/myapp");
+		return sinkOptions;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// For version-specific tests
+	// --------------------------------------------------------------------------------------------
+
+	protected abstract String getElasticsearchVersion();
+
+	protected abstract ElasticsearchUpsertTableSinkBase getExpectedTableSink(
+		boolean isAppendOnly,
+		TableSchema schema,
+		List<Host> hosts,
+		String index,
+		String docType,
+		String keyDelimiter,
+		String keyNullLiteral,
+		SerializationSchema<Row> serializationSchema,
+		XContentType contentType,
+		ActionRequestFailureHandler failureHandler,
+		Map<SinkOption, String> sinkOptions);
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Custom failure handler for testing.
+	 */
+	public static class DummyFailureHandler implements ActionRequestFailureHandler {
+
+		@Override
+		public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) {
+			// do nothing
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			return this == o || o instanceof DummyFailureHandler;
+		}
+
+		@Override
+		public int hashCode() {
+			return DummyFailureHandler.class.hashCode();
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/table/descriptors/ElasticsearchTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/table/descriptors/ElasticsearchTest.java
new file mode 100644
index 00000000000..c97aa0e66e0
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/table/descriptors/ElasticsearchTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the {@link Elasticsearch} descriptor.
+ */
+public class ElasticsearchTest extends DescriptorTestBase {
+
+	@Test(expected = ValidationException.class)
+	public void testMissingIndex() {
+		removePropertyAndVerify(descriptors().get(0), "connector.index");
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testInvalidFailureHandler() {
+		addPropertyAndVerify(descriptors().get(0), "connector.failure-handler", "invalid handler");
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testInvalidMemorySize() {
+		addPropertyAndVerify(descriptors().get(1), "connector.bulk-flush.max-size", "12 bytes");
+	}
+
+	@Override
+	public List<Descriptor> descriptors() {
+		final Descriptor minimumDesc =
+			new Elasticsearch()
+				.version("6")
+				.host("localhost", 1234, "http")
+				.index("MyIndex")
+				.documentType("MyType");
+
+		final Descriptor maximumDesc =
+			new Elasticsearch()
+				.version("6")
+				.host("host1", 1234, "https")
+				.host("host2", 1234, "https")
+				.index("MyIndex")
+				.documentType("MyType")
+				.keyDelimiter("#")
+				.keyNullLiteral("")
+				.bulkFlushBackoffExponential()
+				.bulkFlushBackoffDelay(123L)
+				.bulkFlushBackoffMaxRetries(3)
+				.bulkFlushInterval(100L)
+				.bulkFlushMaxActions(1000)
+				.bulkFlushMaxSize("12 MB")
+				.failureHandlerRetryRejected()
+				.connectionMaxRetryTimeout(100)
+				.connectionPathPrefix("/myapp");
+
+		final Descriptor customDesc =
+			new Elasticsearch()
+				.version("6")
+				.host("localhost", 1234, "http")
+				.index("MyIndex")
+				.documentType("MyType")
+				.disableFlushOnCheckpoint()
+				.failureHandlerCustom(NoOpFailureHandler.class);
+
+		return Arrays.asList(minimumDesc, maximumDesc, customDesc);
+	}
+
+	@Override
+	public List<Map<String, String>> properties() {
+		final Map<String, String> minimumDesc = new HashMap<>();
+		minimumDesc.put("connector.property-version", "1");
+		minimumDesc.put("connector.type", "elasticsearch");
+		minimumDesc.put("connector.version", "6");
+		minimumDesc.put("connector.hosts.0.hostname", "localhost");
+		minimumDesc.put("connector.hosts.0.port", "1234");
+		minimumDesc.put("connector.hosts.0.protocol", "http");
+		minimumDesc.put("connector.index", "MyIndex");
+		minimumDesc.put("connector.document-type", "MyType");
+
+		final Map<String, String> maximumDesc = new HashMap<>();
+		maximumDesc.put("connector.property-version", "1");
+		maximumDesc.put("connector.type", "elasticsearch");
+		maximumDesc.put("connector.version", "6");
+		maximumDesc.put("connector.hosts.0.hostname", "host1");
+		maximumDesc.put("connector.hosts.0.port", "1234");
+		maximumDesc.put("connector.hosts.0.protocol", "https");
+		maximumDesc.put("connector.hosts.1.hostname", "host2");
+		maximumDesc.put("connector.hosts.1.port", "1234");
+		maximumDesc.put("connector.hosts.1.protocol", "https");
+		maximumDesc.put("connector.index", "MyIndex");
+		maximumDesc.put("connector.document-type", "MyType");
+		maximumDesc.put("connector.key-delimiter", "#");
+		maximumDesc.put("connector.key-null-literal", "");
+		maximumDesc.put("connector.bulk-flush.backoff.type", "exponential");
+		maximumDesc.put("connector.bulk-flush.backoff.delay", "123");
+		maximumDesc.put("connector.bulk-flush.backoff.max-retries", "3");
+		maximumDesc.put("connector.bulk-flush.interval", "100");
+		maximumDesc.put("connector.bulk-flush.max-actions", "1000");
+		maximumDesc.put("connector.bulk-flush.max-size", "12582912 bytes");
+		maximumDesc.put("connector.failure-handler", "retry-rejected");
+		maximumDesc.put("connector.connection-max-retry-timeout", "100");
+		maximumDesc.put("connector.connection-path-prefix", "/myapp");
+
+		final Map<String, String> customDesc = new HashMap<>();
+		customDesc.put("connector.property-version", "1");
+		customDesc.put("connector.type", "elasticsearch");
+		customDesc.put("connector.version", "6");
+		customDesc.put("connector.hosts.0.hostname", "localhost");
+		customDesc.put("connector.hosts.0.port", "1234");
+		customDesc.put("connector.hosts.0.protocol", "http");
+		customDesc.put("connector.index", "MyIndex");
+		customDesc.put("connector.document-type", "MyType");
+		customDesc.put("connector.flush-on-checkpoint", "false");
+		customDesc.put("connector.failure-handler", "custom");
+		customDesc.put("connector.failure-handler-class", NoOpFailureHandler.class.getName());
+
+		return Arrays.asList(minimumDesc, maximumDesc, customDesc);
+	}
+
+	@Override
+	public DescriptorValidator validator() {
+		return new ElasticsearchValidator();
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml
index ef06d80512b..427ad5ce4dc 100644
--- a/flink-connectors/flink-connector-elasticsearch6/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml
@@ -84,6 +84,16 @@ under the License.
 			<version>2.9.1</version>
 		</dependency>
 
+		<!-- Used for the Elasticsearch table sink. -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project, won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
 		<!-- test dependencies -->
 
 		<dependency>
@@ -140,20 +150,119 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-api</artifactId>
+			<artifactId>log4j-core</artifactId>
 			<version>2.9.1</version>
 			<scope>test</scope>
 		</dependency>
 
+		<!-- Elasticsearch table descriptor testing -->
 		<dependency>
-			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-core</artifactId>
-			<version>2.9.1</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- Elasticsearch table sink factory testing -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-json</artifactId>
+			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 	</dependencies>
 
+	<profiles>
+		<!-- Create SQL Client uber jars by default -->
+		<profile>
+			<id>sql-jars</id>
+			<activation>
+				<property>
+					<name>!skipSqlJars</name>
+				</property>
+			</activation>
+			<build>
+				<plugins>
+					<plugin>
+						<groupId>org.apache.maven.plugins</groupId>
+						<artifactId>maven-shade-plugin</artifactId>
+						<executions>
+							<execution>
+								<phase>package</phase>
+								<goals>
+									<goal>shade</goal>
+								</goals>
+								<configuration>
+									<shadedArtifactAttached>true</shadedArtifactAttached>
+									<shadedClassifierName>sql-jar</shadedClassifierName>
+									<filters>
+										<filter>
+											<artifact>*:*</artifact>
+											<!-- It is difficult to find out artifacts that are really required by ES. -->
+											<!-- We use hard filters for now to clean up the SQL JAR. -->
+											<excludes>
+												<exclude>com/carrotsearch/**</exclude>
+												<exclude>com/sun/**</exclude>
+												<exclude>com/tdunning/**</exclude>
+												<exclude>config/**</exclude>
+												<exclude>forbidden/**</exclude>
+												<exclude>joptsimple/**</exclude>
+												<exclude>META-INF/services/com.fasterxml.**</exclude>
+												<exclude>META-INF/services/org.apache.lucene.**</exclude>
+												<exclude>META-INF/services/org.elasticsearch.**</exclude>
+												<exclude>META-INF/versions/**</exclude>
+												<exclude>modules.txt</exclude>
+												<exclude>mozilla/**</exclude>
+												<exclude>org/HdrHistogram/**</exclude>
+												<exclude>org/joda/**</exclude>
+												<exclude>org/tartarus/**</exclude>
+												<exclude>org/yaml/**</exclude>
+												<exclude>plugins.txt</exclude>
+											</excludes>
+										</filter>
+									</filters>
+									<relocations>
+										<!-- Force relocation of all Elasticsearch dependencies. -->
+										<relocation>
+											<pattern>org.apache.commons</pattern>
+											<shadedPattern>org.apache.flink.elasticsearch6.shaded.org.apache.commons</shadedPattern>
+										</relocation>
+										<relocation>
+											<pattern>org.apache.http</pattern>
+											<shadedPattern>org.apache.flink.elasticsearch6.shaded.org.apache.http</shadedPattern>
+										</relocation>
+										<relocation>
+											<pattern>org.apache.lucene</pattern>
+											<shadedPattern>org.apache.flink.elasticsearch6.shaded.org.apache.lucene</shadedPattern>
+										</relocation>
+										<relocation>
+											<pattern>org.elasticsearch</pattern>
+											<shadedPattern>org.apache.flink.elasticsearch6.shaded.org.elasticsearch</shadedPattern>
+										</relocation>
+										<relocation>
+											<pattern>org.apache.logging</pattern>
+											<shadedPattern>org.apache.flink.elasticsearch6.shaded.org.apache.logging</shadedPattern>
+										</relocation>
+										<relocation>
+											<pattern>com.fasterxml.jackson</pattern>
+											<shadedPattern>org.apache.flink.elasticsearch6.shaded.com.fasterxml.jackson</shadedPattern>
+										</relocation>
+									</relocations>
+									<!-- Relocate the table format factory service file. -->
+									<transformers>
+										<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+									</transformers>
+								</configuration>
+							</execution>
+						</executions>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
+	</profiles>
+
 	<build>
 		<plugins>
 			<!--
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSink.java
new file mode 100644
index 00000000000..4149d14c23a
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSink.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_DELAY;
+import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_ENABLED;
+import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_RETRIES;
+import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_TYPE;
+import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_INTERVAL;
+import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_MAX_ACTIONS;
+import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_MAX_SIZE;
+import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.DISABLE_FLUSH_ON_CHECKPOINT;
+import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.REST_MAX_RETRY_TIMEOUT;
+import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.REST_PATH_PREFIX;
+
+/**
+ * Version-specific upsert table sink for Elasticsearch 6.
+ */
+@Internal
+public class Elasticsearch6UpsertTableSink extends ElasticsearchUpsertTableSinkBase {
+
+	@VisibleForTesting
+	static final RequestFactory UPDATE_REQUEST_FACTORY =
+		new Elasticsearch6RequestFactory();
+
+	public Elasticsearch6UpsertTableSink(
+			boolean isAppendOnly,
+			TableSchema schema,
+			List<Host> hosts,
+			String index,
+			String docType,
+			String keyDelimiter,
+			String keyNullLiteral,
+			SerializationSchema<Row> serializationSchema,
+			XContentType contentType,
+			ActionRequestFailureHandler failureHandler,
+			Map<SinkOption, String> sinkOptions) {
+
+		super(
+			isAppendOnly,
+			schema,
+			hosts,
+			index,
+			docType,
+			keyDelimiter,
+			keyNullLiteral,
+			serializationSchema,
+			contentType,
+			failureHandler,
+			sinkOptions,
+			UPDATE_REQUEST_FACTORY);
+	}
+
+	@Override
+	protected ElasticsearchUpsertTableSinkBase copy(
+			boolean isAppendOnly,
+			TableSchema schema,
+			List<Host> hosts,
+			String index,
+			String docType,
+			String keyDelimiter,
+			String keyNullLiteral,
+			SerializationSchema<Row> serializationSchema,
+			XContentType contentType,
+			ActionRequestFailureHandler failureHandler,
+			Map<SinkOption, String> sinkOptions,
+			RequestFactory requestFactory) {
+
+		return new Elasticsearch6UpsertTableSink(
+			isAppendOnly,
+			schema,
+			hosts,
+			index,
+			docType,
+			keyDelimiter,
+			keyNullLiteral,
+			serializationSchema,
+			contentType,
+			failureHandler,
+			sinkOptions);
+	}
+
+	@Override
+	protected SinkFunction<Tuple2<Boolean, Row>> createSinkFunction(
+			List<Host> hosts,
+			ActionRequestFailureHandler failureHandler,
+			Map<SinkOption, String> sinkOptions,
+			ElasticsearchUpsertSinkFunction upsertSinkFunction) {
+
+		final List<HttpHost> httpHosts = hosts.stream()
+			.map((host) -> new HttpHost(host.hostname, host.port, host.protocol))
+			.collect(Collectors.toList());
+
+		final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> builder = createBuilder(upsertSinkFunction, httpHosts);
+
+		builder.setFailureHandler(failureHandler);
+
+		Optional.ofNullable(sinkOptions.get(BULK_FLUSH_MAX_ACTIONS))
+			.ifPresent(v -> builder.setBulkFlushMaxActions(Integer.valueOf(v)));
+
+		Optional.ofNullable(sinkOptions.get(BULK_FLUSH_MAX_SIZE))
+			.ifPresent(v -> builder.setBulkFlushMaxSizeMb(MemorySize.parse(v).getMebiBytes()));
+
+		Optional.ofNullable(sinkOptions.get(BULK_FLUSH_INTERVAL))
+			.ifPresent(v -> builder.setBulkFlushInterval(Long.valueOf(v)));
+
+		Optional.ofNullable(sinkOptions.get(BULK_FLUSH_BACKOFF_ENABLED))
+			.ifPresent(v -> builder.setBulkFlushBackoff(Boolean.valueOf(v)));
+
+		Optional.ofNullable(sinkOptions.get(BULK_FLUSH_BACKOFF_TYPE))
+			.ifPresent(v -> builder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.valueOf(v)));
+
+		Optional.ofNullable(sinkOptions.get(BULK_FLUSH_BACKOFF_RETRIES))
+			.ifPresent(v -> builder.setBulkFlushBackoffRetries(Integer.valueOf(v)));
+
+		Optional.ofNullable(sinkOptions.get(BULK_FLUSH_BACKOFF_DELAY))
+			.ifPresent(v -> builder.setBulkFlushBackoffDelay(Long.valueOf(v)));
+
+		builder.setRestClientFactory(
+			new DefaultRestClientFactory(
+				Optional.ofNullable(sinkOptions.get(REST_MAX_RETRY_TIMEOUT))
+					.map(Integer::valueOf)
+					.orElse(null),
+				sinkOptions.get(REST_PATH_PREFIX)));
+
+		final ElasticsearchSink<Tuple2<Boolean, Row>> sink = builder.build();
+
+		Optional.ofNullable(sinkOptions.get(DISABLE_FLUSH_ON_CHECKPOINT))
+			.ifPresent(v -> {
+				if (Boolean.valueOf(v)) {
+					sink.disableFlushOnCheckpoint();
+				}
+			});
+
+		return sink;
+	}
+
+	@VisibleForTesting
+	ElasticsearchSink.Builder<Tuple2<Boolean, Row>> createBuilder(
+			ElasticsearchUpsertSinkFunction upsertSinkFunction,
+			List<HttpHost> httpHosts) {
+		return new ElasticsearchSink.Builder<>(httpHosts, upsertSinkFunction);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Serializable {@link RestClientFactory} used by the sink.
+	 */
+	@VisibleForTesting
+	static class DefaultRestClientFactory implements RestClientFactory {
+
+		private Integer maxRetryTimeout;
+		private String pathPrefix;
+
+		public DefaultRestClientFactory(@Nullable Integer maxRetryTimeout, @Nullable String pathPrefix) {
+			this.maxRetryTimeout = maxRetryTimeout;
+			this.pathPrefix = pathPrefix;
+		}
+
+		@Override
+		public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+			if (maxRetryTimeout != null) {
+				restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeout);
+			}
+			if (pathPrefix != null) {
+				restClientBuilder.setPathPrefix(pathPrefix);
+			}
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			DefaultRestClientFactory that = (DefaultRestClientFactory) o;
+			return Objects.equals(maxRetryTimeout, that.maxRetryTimeout) &&
+				Objects.equals(pathPrefix, that.pathPrefix);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(
+				maxRetryTimeout,
+				pathPrefix);
+		}
+	}
+
+	/**
+	 * Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the sink.
+	 */
+	private static class Elasticsearch6RequestFactory implements RequestFactory {
+
+		@Override
+		public UpdateRequest createUpdateRequest(
+				String index,
+				String docType,
+				String key,
+				XContentType contentType,
+				byte[] document) {
+			return new UpdateRequest(index, docType, key)
+				.doc(document, contentType)
+				.upsert(document, contentType);
+		}
+
+		@Override
+		public IndexRequest createIndexRequest(
+				String index,
+				String docType,
+				XContentType contentType,
+				byte[] document) {
+			return new IndexRequest(index, docType)
+				.source(document, contentType);
+		}
+
+		@Override
+		public DeleteRequest createDeleteRequest(String index, String docType, String key) {
+			return new DeleteRequest(index, docType, key);
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactory.java
new file mode 100644
index 00000000000..1b9b142dbfc
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.Host;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.types.Row;
+
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_VERSION_VALUE_6;
+
+/**
+ * Table factory for creating an {@link UpsertStreamTableSink} for Elasticsearch 6.
+ */
+@Internal
+public class Elasticsearch6UpsertTableSinkFactory extends ElasticsearchUpsertTableSinkFactoryBase {
+
+	@Override
+	protected String elasticsearchVersion() {
+		return CONNECTOR_VERSION_VALUE_6;
+	}
+
+	@Override
+	protected ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
+			boolean isAppendOnly,
+			TableSchema schema,
+			List<Host> hosts,
+			String index,
+			String docType,
+			String keyDelimiter,
+			String keyNullLiteral,
+			SerializationSchema<Row> serializationSchema,
+			XContentType contentType,
+			ActionRequestFailureHandler failureHandler,
+			Map<SinkOption, String> sinkOptions) {
+
+		return new Elasticsearch6UpsertTableSink(
+			isAppendOnly,
+			schema,
+			hosts,
+			index,
+			docType,
+			keyDelimiter,
+			keyNullLiteral,
+			serializationSchema,
+			contentType,
+			failureHandler,
+			sinkOptions);
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
index 4e7a2635738..484e6f6e151 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
@@ -32,6 +32,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Elasticsearch 6.x sink that requests multiple {@link ActionRequest ActionRequests}
@@ -207,5 +208,31 @@ public void setRestClientFactory(RestClientFactory restClientFactory) {
 		public ElasticsearchSink<T> build() {
 			return new ElasticsearchSink<>(bulkRequestsConfig, httpHosts, elasticsearchSinkFunction, failureHandler, restClientFactory);
 		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			Builder<?> builder = (Builder<?>) o;
+			return Objects.equals(httpHosts, builder.httpHosts) &&
+				Objects.equals(elasticsearchSinkFunction, builder.elasticsearchSinkFunction) &&
+				Objects.equals(bulkRequestsConfig, builder.bulkRequestsConfig) &&
+				Objects.equals(failureHandler, builder.failureHandler) &&
+				Objects.equals(restClientFactory, builder.restClientFactory);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(
+				httpHosts,
+				elasticsearchSinkFunction,
+				bulkRequestsConfig,
+				failureHandler,
+				restClientFactory);
+		}
 	}
 }
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 00000000000..14c309d1c6c
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6UpsertTableSinkFactory
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java
new file mode 100644
index 00000000000..3ca9022bb58
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.formats.json.JsonRowSerializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.ElasticsearchUpsertSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.Host;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryTestBase;
+import org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6UpsertTableSink.DefaultRestClientFactory;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_VERSION_VALUE_6;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link Elasticsearch6UpsertTableSink} created by {@link Elasticsearch6UpsertTableSinkFactory}.
+ */
+public class Elasticsearch6UpsertTableSinkFactoryTest extends ElasticsearchUpsertTableSinkFactoryTestBase {
+
+	@Test
+	public void testBuilder() {
+		final TableSchema schema = createTestSchema();
+
+		final TestElasticsearch6UpsertTableSink testSink = new TestElasticsearch6UpsertTableSink(
+			false,
+			schema,
+			Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
+			INDEX,
+			DOC_TYPE,
+			KEY_DELIMITER,
+			KEY_NULL_LITERAL,
+			new JsonRowSerializationSchema(schema.toRowType()),
+			XContentType.JSON,
+			new DummyFailureHandler(),
+			createTestSinkOptions());
+
+		final DataStreamMock dataStreamMock = new DataStreamMock(
+				new StreamExecutionEnvironmentMock(),
+				Types.TUPLE(Types.BOOLEAN, schema.toRowType()));
+
+		testSink.emitDataStream(dataStreamMock);
+
+		final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
+			Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
+			new ElasticsearchUpsertSinkFunction(
+				INDEX,
+				DOC_TYPE,
+				KEY_DELIMITER,
+				KEY_NULL_LITERAL,
+				new JsonRowSerializationSchema(schema.toRowType()),
+				XContentType.JSON,
+				Elasticsearch6UpsertTableSink.UPDATE_REQUEST_FACTORY,
+				new int[0]));
+		expectedBuilder.setFailureHandler(new DummyFailureHandler());
+		expectedBuilder.setBulkFlushBackoff(true);
+		expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
+		expectedBuilder.setBulkFlushBackoffDelay(123);
+		expectedBuilder.setBulkFlushBackoffRetries(3);
+		expectedBuilder.setBulkFlushInterval(100);
+		expectedBuilder.setBulkFlushMaxActions(1000);
+		expectedBuilder.setBulkFlushMaxSizeMb(1);
+		expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp"));
+
+		assertEquals(expectedBuilder, testSink.builder);
+	}
+
+	@Override
+	protected String getElasticsearchVersion() {
+		return CONNECTOR_VERSION_VALUE_6;
+	}
+
+	@Override
+	protected ElasticsearchUpsertTableSinkBase getExpectedTableSink(
+			boolean isAppendOnly,
+			TableSchema schema,
+			List<Host> hosts,
+			String index,
+			String docType,
+			String keyDelimiter,
+			String keyNullLiteral,
+			SerializationSchema<Row> serializationSchema,
+			XContentType contentType,
+			ActionRequestFailureHandler failureHandler,
+			Map<SinkOption, String> sinkOptions) {
+		return new Elasticsearch6UpsertTableSink(
+			isAppendOnly,
+			schema,
+			hosts,
+			index,
+			docType,
+			keyDelimiter,
+			keyNullLiteral,
+			serializationSchema,
+			contentType,
+			failureHandler,
+			sinkOptions);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	private static class TestElasticsearch6UpsertTableSink extends Elasticsearch6UpsertTableSink {
+
+		public ElasticsearchSink.Builder<Tuple2<Boolean, Row>> builder;
+
+		public TestElasticsearch6UpsertTableSink(
+				boolean isAppendOnly,
+				TableSchema schema,
+				List<Host> hosts,
+				String index,
+				String docType,
+				String keyDelimiter,
+				String keyNullLiteral,
+				SerializationSchema<Row> serializationSchema,
+				XContentType contentType,
+				ActionRequestFailureHandler failureHandler,
+				Map<SinkOption, String> sinkOptions) {
+
+			super(
+				isAppendOnly,
+				schema,
+				hosts,
+				index,
+				docType,
+				keyDelimiter,
+				keyNullLiteral,
+				serializationSchema,
+				contentType,
+				failureHandler,
+				sinkOptions);
+		}
+
+		@Override
+		protected ElasticsearchSink.Builder<Tuple2<Boolean, Row>> createBuilder(
+				ElasticsearchUpsertSinkFunction upsertSinkFunction,
+				List<HttpHost> httpHosts) {
+			builder = super.createBuilder(upsertSinkFunction, httpHosts);
+			return builder;
+		}
+	}
+
+	private static class StreamExecutionEnvironmentMock extends StreamExecutionEnvironment {
+
+		@Override
+		public JobExecutionResult execute(String jobName) {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	private static class DataStreamMock extends DataStream<Tuple2<Boolean, Row>> {
+
+		public SinkFunction<?> sinkFunction;
+
+		public DataStreamMock(StreamExecutionEnvironment environment, TypeInformation<Tuple2<Boolean, Row>> outType) {
+			super(environment, new StreamTransformationMock("name", outType, 1));
+		}
+
+		@Override
+		public DataStreamSink<Tuple2<Boolean, Row>> addSink(SinkFunction<Tuple2<Boolean, Row>> sinkFunction) {
+			this.sinkFunction = sinkFunction;
+			return super.addSink(sinkFunction);
+		}
+	}
+
+	private static class StreamTransformationMock extends StreamTransformation<Tuple2<Boolean, Row>> {
+
+		public StreamTransformationMock(String name, TypeInformation<Tuple2<Boolean, Row>> outputType, int parallelism) {
+			super(name, outputType, parallelism);
+		}
+
+		@Override
+		public void setChainingStrategy(ChainingStrategy strategy) {
+			// do nothing
+		}
+
+		@Override
+		public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+			return null;
+		}
+	}
+}
diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index 6e69568939f..de1db9e6bf5 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -89,6 +89,14 @@ under the License.
 			<classifier>sql-jar</classifier>
 			<scope>provided</scope>
 		</dependency>
+		<dependency>
+			<!-- Used by maven-dependency-plugin -->
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<classifier>sql-jar</classifier>
+			<scope>provided</scope>
+		</dependency>
 	</dependencies>
 
 	<dependencyManagement>
@@ -176,6 +184,13 @@ under the License.
 									<classifier>sql-jar</classifier>
 									<type>jar</type>
 								</artifactItem>-->
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
 							</artifactItems>
 						</configuration>
 					</execution>
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index fa6c33124be..e5f1ec9ebe4 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -60,7 +60,7 @@ function verify_elasticsearch_process_exist {
     exit 1
 }
 
-function verify_result {
+function verify_result_line_number {
     local numRecords=$1
     local index=$2
 
@@ -81,6 +81,29 @@ function verify_result {
     done
 }
 
+function verify_result_hash {
+  local name=$1
+  local index=$2
+  local numRecords=$3
+  local hash=$4
+
+  while : ; do
+    curl "localhost:9200/${index}/_search?q=*&pretty" > $TEST_DATA_DIR/es_output
+
+    if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/es_output)" ]; then
+      break
+    else
+      echo "Waiting for Elasticsearch records ..."
+      sleep 1
+    fi
+  done
+
+  # remove meta information
+  sed '2,9d' $TEST_DATA_DIR/es_output > $TEST_DATA_DIR/es_content
+
+  check_result_hash "$name" $TEST_DATA_DIR/es_content "$hash"
+}
+
 function shutdown_elasticsearch_cluster {
    local index=$1
    curl -X DELETE "http://localhost:9200/${index}"
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index ca0251365e6..30833fbf211 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -19,6 +19,7 @@
 
 source "$(dirname "$0")"/common.sh
 source "$(dirname "$0")"/kafka-common.sh
+source "$(dirname "$0")"/elasticsearch-common.sh
 
 SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar
 SQL_JARS_DIR=$END_TO_END_DIR/flink-sql-client-test/target/sql-jars
@@ -58,10 +59,10 @@ for SQL_JAR in $SQL_JARS_DIR/*.jar; do
 done
 
 ################################################################################
-# Run a SQL statement
+# Prepare connectors
 ################################################################################
 
-echo "Testing SQL statement..."
+ELASTICSEARCH_INDEX='my_users'
 
 function sql_cleanup() {
   # don't call ourselves again for another signal interruption
@@ -70,6 +71,7 @@ function sql_cleanup() {
   trap "" EXIT
 
   stop_kafka_cluster
+  shutdown_elasticsearch_cluster "$ELASTICSEARCH_INDEX"
 }
 trap sql_cleanup INT
 trap sql_cleanup EXIT
@@ -100,11 +102,26 @@ send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": null, "even
 # pending in results
 send_messages_to_kafka '{"timestamp": "2018-03-12 10:40:00", "user": "Bob", "event": { "type": "ERROR", "message": "This is an error."}}' test-json
 
+# prepare Elasticsearch
+echo "Preparing Elasticsearch..."
+
+ELASTICSEARCH_VERSION=6
+DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz'
+
+setup_elasticsearch $DOWNLOAD_URL
+verify_elasticsearch_process_exist
+
+################################################################################
+# Run a SQL statement
+################################################################################
+
+echo "Testing SQL statement..."
+
 # prepare Flink
 echo "Preparing Flink..."
 
 start_cluster
-start_taskmanagers 1
+start_taskmanagers 2
 
 # create session environment file
 RESULT=$TEST_DATA_DIR/result
@@ -230,6 +247,54 @@ tables:
           type: BIGINT
         - name: constant
           type: VARCHAR
+  - name: ElasticsearchUpsertSinkTable
+    type: sink
+    update-mode: upsert
+    schema:
+      - name: user_id
+        type: INT
+      - name: user_name
+        type: VARCHAR
+      - name: user_count
+        type: BIGINT
+    connector:
+      type: elasticsearch
+      version: 6
+      hosts:
+        - hostname: "localhost"
+          port: 9200
+          protocol: "http"
+      index: "$ELASTICSEARCH_INDEX"
+      document-type: "user"
+      bulk-flush:
+        max-actions: 1
+    format:
+      type: json
+      derive-schema: true
+  - name: ElasticsearchAppendSinkTable
+    type: sink
+    update-mode: append
+    schema:
+      - name: user_id
+        type: INT
+      - name: user_name
+        type: VARCHAR
+      - name: user_count
+        type: BIGINT
+    connector:
+      type: elasticsearch
+      version: 6
+      hosts:
+        - hostname: "localhost"
+          port: 9200
+          protocol: "http"
+      index: "$ELASTICSEARCH_INDEX"
+      document-type: "user"
+      bulk-flush:
+        max-actions: 1
+    format:
+      type: json
+      derive-schema: true
 
 functions:
   - name: RegReplace
@@ -239,6 +304,8 @@ EOF
 
 # submit SQL statements
 
+echo "Executing SQL: Kafka JSON -> Kafka Avro"
+
 read -r -d '' SQL_STATEMENT_1 << EOF
 INSERT INTO AvroBothTable
   SELECT
@@ -254,7 +321,6 @@ INSERT INTO AvroBothTable
     TUMBLE(rowtime, INTERVAL '1' HOUR)
 EOF
 
-echo "Executing SQL: Kafka JSON -> Kafka Avro"
 echo "$SQL_STATEMENT_1"
 
 $FLINK_DIR/bin/sql-client.sh embedded \
@@ -263,13 +329,14 @@ $FLINK_DIR/bin/sql-client.sh embedded \
   --environment $SQL_CONF \
   --update "$SQL_STATEMENT_1"
 
+echo "Executing SQL: Kafka Avro -> Filesystem CSV"
+
 read -r -d '' SQL_STATEMENT_2 << EOF
 INSERT INTO CsvSinkTable
   SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant
   FROM AvroBothTable
 EOF
 
-echo "Executing SQL: Kafka Avro -> Filesystem CSV"
 echo "$SQL_STATEMENT_2"
 
 $FLINK_DIR/bin/sql-client.sh embedded \
@@ -289,4 +356,51 @@ for i in {1..10}; do
   sleep 5
 done
 
-check_result_hash "SQLClient" $RESULT "0a1bf8bf716069b7269f575f87a802c0"
+check_result_hash "SQL Client Kafka" $RESULT "0a1bf8bf716069b7269f575f87a802c0"
+
+echo "Executing SQL: Values -> Elasticsearch (upsert)"
+
+read -r -d '' SQL_STATEMENT_3 << EOF
+INSERT INTO ElasticsearchUpsertSinkTable
+  SELECT user_id, user_name, COUNT(*) AS user_count
+  FROM (VALUES (1, 'Bob'), (22, 'Alice'), (42, 'Greg'), (42, 'Greg'), (42, 'Greg'), (1, 'Bob'))
+    AS UserCountTable(user_id, user_name)
+  GROUP BY user_id, user_name
+EOF
+
+JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
+  --library $SQL_JARS_DIR \
+  --jar $SQL_TOOLBOX_JAR \
+  --environment $SQL_CONF \
+  --update "$SQL_STATEMENT_3" | grep "Job ID:" | sed 's/.* //g')
+
+wait_job_terminal_state "$JOB_ID" "FINISHED"
+
+verify_result_hash "SQL Client Elasticsearch Upsert" "$ELASTICSEARCH_INDEX" 3 "982cb32908def9801e781381c1b8a8db"
+
+echo "Executing SQL: Values -> Elasticsearch (append, no key)"
+
+read -r -d '' SQL_STATEMENT_4 << EOF
+INSERT INTO ElasticsearchAppendSinkTable
+  SELECT *
+  FROM (
+    VALUES
+      (1, 'Bob', CAST(0 AS BIGINT)),
+      (22, 'Alice', CAST(0 AS BIGINT)),
+      (42, 'Greg', CAST(0 AS BIGINT)),
+      (42, 'Greg', CAST(0 AS BIGINT)),
+      (42, 'Greg', CAST(0 AS BIGINT)),
+      (1, 'Bob', CAST(0 AS BIGINT)))
+    AS UserCountTable(user_id, user_name, user_count)
+EOF
+
+JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
+  --library $SQL_JARS_DIR \
+  --jar $SQL_TOOLBOX_JAR \
+  --environment $SQL_CONF \
+  --update "$SQL_STATEMENT_4" | grep "Job ID:" | sed 's/.* //g')
+
+wait_job_terminal_state "$JOB_ID" "FINISHED"
+
+# 3 upsert results and 6 append results
+verify_result_line_number 9 "$ELASTICSEARCH_INDEX"
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
index 800c4e20ae0..36a38566707 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
@@ -46,4 +46,4 @@ $FLINK_DIR/bin/flink run -p 1 $TEST_ES_JAR \
   --type type
 
 # 40 index requests and 20 final update requests
-verify_result 60 index
+verify_result_line_number 60 index
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 5f45cc35f2b..65191202ad8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -359,7 +359,7 @@ abstract class StreamTableEnvironment(
           case Some(keys) => upsertSink.setKeyFields(keys)
           case None if isAppendOnlyTable => upsertSink.setKeyFields(null)
           case None if !isAppendOnlyTable => throw new TableException(
-            "UpsertStreamTableSink requires that Table has a full primary keys if it is updated.")
+            "UpsertStreamTableSink requires that Table has full primary keys if it is updated.")
         }
         val outputType = sink.getOutputType
         val resultType = getResultType(table.getRelNode, optimizedPlan)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
index 2c88dfd141e..9328c806913 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -31,10 +31,11 @@ import org.apache.commons.lang.StringEscapeUtils
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.MemorySize
 import org.apache.flink.table.api.{TableException, TableSchema, ValidationException}
 import org.apache.flink.table.descriptors.DescriptorProperties.{NAME, TYPE, normalizeTableSchema, toJava}
 import org.apache.flink.table.typeutils.TypeStringUtils
-import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.util.{InstantiationUtil, Preconditions}
 import org.apache.flink.util.Preconditions.checkNotNull
 
 import scala.collection.JavaConverters._
@@ -129,6 +130,15 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     putTableSchema(key, normalizeTableSchema(schema))
   }
 
+  /**
+    * Adds a Flink [[MemorySize]] under the given key.
+    */
+  def putMemorySize(key: String, size: MemorySize): Unit = {
+    checkNotNull(key)
+    checkNotNull(size)
+    put(key, size.toString)
+  }
+
   /**
     * Adds a table schema under the given key.
     */
@@ -411,6 +421,21 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     getOptionalTableSchema(key).orElseThrow(exceptionSupplier(key))
   }
 
+  /**
+    * Returns a Flink [[MemorySize]] under the given key if it exists.
+    */
+  def getOptionalMemorySize(key: String): Optional[MemorySize] = {
+    val value = properties.get(key).map(MemorySize.parse(_, MemorySize.MemoryUnit.BYTES))
+    toJava(value)
+  }
+
+  /**
+    * Returns a Flink [[MemorySize]] under the given existing key.
+    */
+  def getMemorySize(key: String): MemorySize = {
+    getOptionalMemorySize(key).orElseThrow(exceptionSupplier(key))
+  }
+
   /**
     * Returns the property keys of fixed indexed properties.
     *
@@ -550,6 +575,13 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     }.toMap.asJava
   }
 
+  /**
+    * Returns if a key is exactly equal to the given value.
+    */
+  def isValue(key: String, value: String): Boolean = {
+    getString(key) == value
+  }
+
   // ----------------------------------------------------------------------------------------------
 
   /**
@@ -963,6 +995,63 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     )
   }
 
+  /**
+    * Validates a Flink [[MemorySize]].
+    *
+    * The precision defines the allowed minimum unit in bytes (e.g. 1024 would only allow KB).
+    */
+  def validateMemorySize(
+      key: String,
+      isOptional: Boolean,
+      precision: Int)
+    : Unit = {
+    validateMemorySize(key, isOptional, precision, 0, Long.MaxValue)
+  }
+
+  /**
+    * Validates a Flink [[MemorySize]]. The boundaries are inclusive and in bytes.
+    *
+    * The precision defines the allowed minimum unit in bytes (e.g. 1024 would only allow KB).
+    */
+  def validateMemorySize(
+      key: String,
+      isOptional: Boolean,
+      precision: Int,
+      min: Long) // inclusive
+    : Unit = {
+    validateMemorySize(key, isOptional, precision, min, Long.MaxValue)
+  }
+
+  /**
+    * Validates a Flink [[MemorySize]]. The boundaries are inclusive and in bytes.
+    *
+    * The precision defines the allowed minimum unit in bytes (e.g. 1024 would only allow KB).
+    */
+  def validateMemorySize(
+      key: String,
+      isOptional: Boolean,
+      precision: Int,
+      min: Long, // inclusive
+      max: Long) // inclusive
+    : Unit = {
+    Preconditions.checkArgument(precision > 0)
+
+    validateComparable(
+      key,
+      isOptional,
+      Long.box(min),
+      Long.box(max),
+      "memory size (in bytes)",
+      (value: String) => {
+        val bytes = Long.box(MemorySize.parse(value, MemorySize.MemoryUnit.BYTES).getBytes)
+        if (bytes % precision != 0) {
+          throw new ValidationException(
+            s"Memory size for key '$key' must be a multiple of $precision bytes but was: $value")
+        }
+        bytes
+      })
+  }
+
   /**
     * Validates a enum property with a set of validation logic for each enum value.
     */
@@ -1089,7 +1178,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     */
   private def put(key: String, value: String): Unit = {
     if (properties.contains(key)) {
-      throw new IllegalStateException("Property already present:" + key)
+      throw new ValidationException(s"Property already present: $key")
     }
     if (normalizeKeys) {
       properties.put(key.toLowerCase, value)
@@ -1191,6 +1280,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
       isOptional: Boolean,
       min: T,
       max: T,
+      typeName: String,
       parseFunction: String => T)
     : Unit = {
     if (!properties.contains(key)) {
@@ -1198,7 +1288,6 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
         throw new ValidationException(s"Could not find required property '$key'.")
       }
     } else {
-      val typeName = min.getClass.getSimpleName
       try {
         val value = parseFunction(properties(key))
 
@@ -1207,12 +1296,33 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
             s" value between $min and $max but was: ${properties(key)}")
         }
       } catch {
-        case _: NumberFormatException =>
+        case _: NumberFormatException | _: IllegalArgumentException =>
           throw new ValidationException(
             s"Property '$key' must be a $typeName value but was: ${properties(key)}")
       }
     }
   }
+
+  /**
+    * Validates a property by first parsing the string value to a comparable object.
+    * The boundaries are inclusive.
+    */
+  private def validateComparable[T <: Comparable[T]](
+      key: String,
+      isOptional: Boolean,
+      min: T,
+      max: T,
+      parseFunction: String => T)
+    : Unit = {
+
+    validateComparable(
+      key,
+      isOptional,
+      min,
+      max,
+      min.getClass.getSimpleName,
+      parseFunction)
+  }
 }
 
 object DescriptorProperties {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
index b9e64f90c90..24c3302c549 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
@@ -93,9 +93,5 @@ class StreamTableDescriptor(
   override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
     super.addProperties(properties)
     updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
-
-    // this performs only basic validation
-    // more validation can only happen within a factory
-    new StreamTableDescriptorValidator().validate(properties)
   }
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala
index 5a6a946ea07..8d60f89d0e2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala
@@ -25,16 +25,27 @@ import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
 /**
   * Validator for [[StreamTableDescriptor]].
   */
-class StreamTableDescriptorValidator extends DescriptorValidator {
+class StreamTableDescriptorValidator(
+    supportsAppend: Boolean,
+    supportsRetract: Boolean,
+    supportsUpsert: Boolean)
+  extends DescriptorValidator {
 
   override def validate(properties: DescriptorProperties): Unit = {
+    val modeList = new util.ArrayList[String]()
+    if (supportsAppend) {
+      modeList.add(UPDATE_MODE_VALUE_APPEND)
+    }
+    if (supportsRetract) {
+      modeList.add(UPDATE_MODE_VALUE_RETRACT)
+    }
+    if (supportsUpsert) {
+      modeList.add(UPDATE_MODE_VALUE_UPSERT)
+    }
     properties.validateEnumValues(
       UPDATE_MODE,
       isOptional = false,
-      util.Arrays.asList(
-        UPDATE_MODE_VALUE_APPEND,
-        UPDATE_MODE_VALUE_RETRACT,
-        UPDATE_MODE_VALUE_UPSERT)
+      modeList
     )
   }
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
index c171573eed1..d3be50780f6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
@@ -75,5 +75,6 @@ trait UpsertStreamTableSink[T] extends StreamTableSink[JTuple2[JBool, T]] {
   /** Emits the DataStream. */
   def emitDataStream(dataStream: DataStream[JTuple2[JBool, T]]): Unit
 
-  override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType)
+  override def getOutputType: TypeInformation[JTuple2[JBool, T]] =
+    new TupleTypeInfo(Types.BOOLEAN, getRecordType)
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index d1666a9547d..02af79843a4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -90,6 +90,12 @@ object TypeCheckUtils {
   def isComparable(dataType: TypeInformation[_]): Boolean =
     classOf[Comparable[_]].isAssignableFrom(dataType.getTypeClass) && !isArray(dataType)
 
+  /**
+    * Types that can be easily converted into a string without ambiguity.
+    */
+  def isSimpleStringRepresentation(dataType: TypeInformation[_]): Boolean =
+    isNumeric(dataType) || isString(dataType) || isTemporal(dataType) || isBoolean(dataType)
+
   def assertNumericExpr(
       dataType: TypeInformation[_],
       caller: String)
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index 9382ec9b5d8..160ca465042 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -461,7 +461,7 @@ check_shaded_artifacts_connector_elasticsearch() {
 	VARIANT=$1
 	find flink-connectors/flink-connector-elasticsearch${VARIANT}/target/flink-connector-elasticsearch${VARIANT}*.jar ! -name "*-tests.jar" -exec jar tf {} \; > allClasses
 
-	UNSHADED_CLASSES=`cat allClasses | grep -v -e '^META-INF' -e '^assets' -e "^org/apache/flink/streaming/connectors/elasticsearch/" -e "^org/apache/flink/streaming/connectors/elasticsearch${VARIANT}/" -e "^org/elasticsearch/" | grep '\.class$'`
+	UNSHADED_CLASSES=`cat allClasses | grep -v -e '^META-INF' -e '^assets' -e "^org/apache/flink/streaming/connectors/elasticsearch/" -e "^org/apache/flink/streaming/connectors/elasticsearch${VARIANT}/" -e "^org/apache/flink/table/descriptors/" -e "^org/elasticsearch/" | grep '\.class$'`
 	if [ "$?" == "0" ]; then
 		echo "=============================================================================="
 		echo "Detected unshaded dependencies in flink-connector-elasticsearch${VARIANT}'s fat jar:"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services