You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/05/31 03:27:10 UTC

[flink] branch master updated: [FLINK-12440][python] Add all connector support align Java Table API.

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 836fdff  [FLINK-12440][python] Add all connector support align Java Table API.
836fdff is described below

commit 836fdfff0db64ff8241f38e8dd362dd50a9d1895
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Fri May 24 14:45:30 2019 +0800

    [FLINK-12440][python] Add all connector support align Java Table API.
    
    This closes #8531
---
 .../main/flink-bin/bin/pyflink-gateway-server.sh   |   2 +-
 flink-python/pyflink/table/__init__.py             |   4 +-
 flink-python/pyflink/table/table_descriptor.py     | 485 ++++++++++++++++++++-
 .../pyflink/table/tests/test_descriptor.py         | 407 ++++++++++++++++-
 tools/travis_controller.sh                         |   2 +
 5 files changed, 883 insertions(+), 17 deletions(-)

diff --git a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh b/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
index 026f813..9e41ad5 100644
--- a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
+++ b/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
@@ -63,7 +63,7 @@ if [[ -n "$FLINK_TESTING" ]]; then
     else
       FLINK_TEST_CLASSPATH="$FLINK_TEST_CLASSPATH":"$testJarFile"
     fi
-  done < <(find "$FLINK_SOURCE_ROOT_DIR" ! -type d -name 'flink-*-tests.jar' -print0 | sort -z)
+  done < <(find "$FLINK_SOURCE_ROOT_DIR" ! -type d \( -name 'flink-*-tests.jar' -o -path "${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-connector-elasticsearch-base/target/flink*.jar" -o -path "${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-connector-kafka-base/target/flink*.jar" \) -print0 | sort -z)
 fi
 
 exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -cp ${FLINK_CLASSPATH}:${TABLE_JAR_PATH}:${FLINK_TEST_CLASSPATH} ${DRIVER} ${ARGS[@]}
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 281647f..904264e 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -40,7 +40,7 @@ from pyflink.table.table_sink import TableSink, CsvTableSink
 from pyflink.table.table_source import TableSource, CsvTableSource
 from pyflink.table.types import DataTypes, UserDefinedType, Row
 from pyflink.table.window import Tumble, Session, Slide, Over
-from pyflink.table.table_descriptor import Rowtime, Schema, OldCsv, FileSystem
+from pyflink.table.table_descriptor import Rowtime, Schema, OldCsv, FileSystem, Kafka, Elasticsearch
 
 __all__ = [
     'TableEnvironment',
@@ -63,4 +63,6 @@ __all__ = [
     'FileSystem',
     'UserDefinedType',
     'Row',
+    'Kafka',
+    'Elasticsearch'
 ]
diff --git a/flink-python/pyflink/table/table_descriptor.py b/flink-python/pyflink/table/table_descriptor.py
index 1dfbde3..65161b4 100644
--- a/flink-python/pyflink/table/table_descriptor.py
+++ b/flink-python/pyflink/table/table_descriptor.py
@@ -30,7 +30,9 @@ __all__ = [
     'Rowtime',
     'Schema',
     'OldCsv',
-    'FileSystem'
+    'FileSystem',
+    'Kafka',
+    'Elasticsearch'
 ]
 
 
@@ -256,7 +258,7 @@ class OldCsv(FormatDescriptor):
         format in the dedicated `flink-formats/flink-csv` module instead when writing to Kafka. Use
         the old one for stream/batch filesystem operations for now.
 
-    .. note::
+    ..note::
         Deprecated: use the RFC-compliant `Csv` format instead when writing to Kafka.
     """
 
@@ -373,6 +375,485 @@ class FileSystem(ConnectorDescriptor):
         return self
 
 
+class Kafka(ConnectorDescriptor):
+    """
+    Connector descriptor for the Apache Kafka message queue.
+    """
+
+    def __init__(self):
+        gateway = get_gateway()
+        self._j_kafka = gateway.jvm.Kafka()
+        super(Kafka, self).__init__(self._j_kafka)
+
+    def version(self, version):
+        """
+        Sets the Kafka version to be used.
+
+        :param version: Kafka version. E.g., "0.8", "0.11", etc.
+        :return: This object.
+        """
+        if not isinstance(version, (str, unicode)):
+            version = str(version)
+        self._j_kafka = self._j_kafka.version(version)
+        return self
+
+    def topic(self, topic):
+        """
+        Sets the topic from which the table is read.
+
+        :param topic: The topic from which the table is read.
+        :return: This object.
+        """
+        self._j_kafka = self._j_kafka.topic(topic)
+        return self
+
+    def properties(self, property_dict):
+        """
+        Sets the configuration properties for the Kafka consumer. Resets previously set properties.
+
+        :param property_dict: The dict object contains configuration properties for the Kafka
+                              consumer. Both the keys and values should be strings.
+        :return: This object.
+        """
+        gateway = get_gateway()
+        properties = gateway.jvm.java.util.Properties()
+        for key in property_dict:
+            properties.setProperty(key, property_dict[key])
+        self._j_kafka = self._j_kafka.properties(properties)
+        return self
+
+    def property(self, key, value):
+        """
+        Adds a configuration properties for the Kafka consumer.
+
+        :param key: Property key string for the Kafka consumer.
+        :param value: Property value string for the Kafka consumer.
+        :return: This object.
+        """
+        self._j_kafka = self._j_kafka.property(key, value)
+        return self
+
+    def start_from_earliest(self):
+        """
+        Specifies the consumer to start reading from the earliest offset for all partitions.
+        This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+
+        This method does not affect where partitions are read from when the consumer is restored
+        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+        savepoint, only the offsets in the restored state will be used.
+
+        :return: This object.
+        """
+        self._j_kafka = self._j_kafka.startFromEarliest()
+        return self
+
+    def start_from_latest(self):
+        """
+        Specifies the consumer to start reading from the latest offset for all partitions.
+        This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+
+        This method does not affect where partitions are read from when the consumer is restored
+        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+        savepoint, only the offsets in the restored state will be used.
+
+        :return: This object.
+        """
+        self._j_kafka = self._j_kafka.startFromLatest()
+        return self
+
+    def start_from_group_offsets(self):
+        """
+        Specifies the consumer to start reading from any committed group offsets found
+        in Zookeeper / Kafka brokers. The "group.id" property must be set in the configuration
+        properties. If no offset can be found for a partition, the behaviour in "auto.offset.reset"
+        set in the configuration properties will be used for the partition.
+
+        This method does not affect where partitions are read from when the consumer is restored
+        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+        savepoint, only the offsets in the restored state will be used.
+
+        :return: This object.
+        """
+        self._j_kafka = self._j_kafka.startFromGroupOffsets()
+        return self
+
+    def start_from_specific_offsets(self, specific_offsets_dict):
+        """
+        Specifies the consumer to start reading partitions from specific offsets, set independently
+        for each partition. The specified offset should be the offset of the next record that will
+        be read from partitions. This lets the consumer ignore any committed group offsets in
+        Zookeeper / Kafka brokers.
+
+        If the provided map of offsets contains entries whose partition is not subscribed by the
+        consumer, the entry will be ignored. If the consumer subscribes to a partition that does
+        not exist in the provided map of offsets, the consumer will fallback to the default group
+        offset behaviour(see :func:`pyflink.table.table_descriptor.Kafka.start_from_group_offsets`)
+        for that particular partition.
+
+        If the specified offset for a partition is invalid, or the behaviour for that partition is
+        defaulted to group offsets but still no group offset could be found for it, then the
+        "auto.offset.reset" behaviour set in the configuration properties will be used for the
+        partition.
+
+        This method does not affect where partitions are read from when the consumer is restored
+        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+        savepoint, only the offsets in the restored state will be used.
+
+        :param specific_offsets_dict: Dict of specific_offsets that the key is int-type partition
+                                      id and value is int-type offset value.
+        :return: This object.
+        """
+        for key in specific_offsets_dict:
+            self.start_from_specific_offset(key, specific_offsets_dict[key])
+        return self
+
+    def start_from_specific_offset(self, partition, specific_offset):
+        """
+        Configures to start reading partitions from specific offsets and specifies the given offset
+        for the given partition.
+
+        see :func:`pyflink.table.table_descriptor.Kafka.start_from_specific_offsets`
+
+        :param partition:
+        :param specific_offset:
+        :return: This object.
+        """
+        self._j_kafka = self._j_kafka.startFromSpecificOffset(int(partition), int(specific_offset))
+        return self
+
+    def sink_partitioner_fixed(self):
+        """
+        Configures how to partition records from Flink's partitions into Kafka's partitions.
+
+        This strategy ensures that each Flink partition ends up in one Kafka partition.
+
+        ..note::
+            One Kafka partition can contain multiple Flink partitions. Examples:
+
+            More Flink partitions than Kafka partitions. Some (or all) Kafka partitions contain
+            the output of more than one flink partition:
+
+            |    Flink Sinks --------- Kafka Partitions
+            |        1    ---------------->    1
+            |        2    --------------/
+            |        3    -------------/
+            |        4    ------------/
+
+            Fewer Flink partitions than Kafka partitions:
+
+            |    Flink Sinks --------- Kafka Partitions
+            |        1    ---------------->    1
+            |        2    ---------------->    2
+            |                                  3
+            |                                  4
+            |                                  5
+
+        :return: This object.
+        """
+        self._j_kafka = self._j_kafka.sinkPartitionerFixed()
+        return self
+
+    def sink_partitioner_round_robin(self):
+        """
+        Configures how to partition records from Flink's partitions into Kafka's partitions.
+
+        This strategy ensures that records will be distributed to Kafka partitions in a
+        round-robin fashion.
+
+        ..note::
+            This strategy is useful to avoid an unbalanced partitioning. However, it will cause a
+            lot of network connections between all the Flink instances and all the Kafka brokers.
+
+        :return: This object.
+        """
+        self._j_kafka = self._j_kafka.sinkPartitionerRoundRobin()
+        return self
+
+    def sink_partitioner_custom(self, partitioner_class_name):
+        """
+        Configures how to partition records from Flink's partitions into Kafka's partitions.
+
+        This strategy allows for a custom partitioner by providing an implementation
+        of ``FlinkKafkaPartitioner``.
+
+        :param partitioner_class_name: The java canonical class name of the FlinkKafkaPartitioner.
+                                       The FlinkKafkaPartitioner must have a public no-argument
+                                       constructor and can be founded by in current Java
+                                       classloader.
+        :return: This object.
+        """
+        gateway = get_gateway()
+        self._j_kafka = self._j_kafka.sinkPartitionerCustom(
+            gateway.jvm.Thread.currentThread().getContextClassLoader()
+                   .loadClass(partitioner_class_name))
+        return self
+
+
+class Elasticsearch(ConnectorDescriptor):
+    """
+    Connector descriptor for the Elasticsearch search engine.
+    """
+
+    def __init__(self):
+        gateway = get_gateway()
+        self._j_elasticsearch = gateway.jvm.Elasticsearch()
+        super(Elasticsearch, self).__init__(self._j_elasticsearch)
+
+    def version(self, version):
+        """
+        Sets the Elasticsearch version to be used. Required.
+
+        :param version: Elasticsearch version. E.g., "6".
+        :return: This object.
+        """
+        if not isinstance(version, (str, unicode)):
+            version = str(version)
+        self._j_elasticsearch = self._j_elasticsearch.version(version)
+        return self
+
+    def host(self, hostname, port, protocol):
+        """
+        Adds an Elasticsearch host to connect to. Required.
+
+        Multiple hosts can be declared by calling this method multiple times.
+
+        :param hostname: Connection hostname.
+        :param port: Connection port.
+        :param protocol: Connection protocol; e.g. "http".
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.host(hostname, int(port), protocol)
+        return self
+
+    def index(self, index):
+        """
+        Declares the Elasticsearch index for every record. Required.
+
+        :param index: Elasticsearch index.
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.index(index)
+        return self
+
+    def document_type(self, document_type):
+        """
+        Declares the Elasticsearch document type for every record. Required.
+
+        :param document_type: Elasticsearch document type.
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.documentType(document_type)
+        return self
+
+    def key_delimiter(self, key_delimiter):
+        """
+        Sets a custom key delimiter in case the Elasticsearch ID needs to be constructed from
+        multiple fields. Optional.
+
+        :param key_delimiter: Key delimiter; e.g., "$" would result in IDs "KEY1$KEY2$KEY3".
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.keyDelimiter(key_delimiter)
+        return self
+
+    def key_null_literal(self, key_null_literal):
+        """
+        Sets a custom representation for null fields in keys. Optional.
+
+        :param key_null_literal: key null literal string; e.g. "N/A" would result in IDs
+                                 "KEY1_N/A_KEY3".
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.keyNullLiteral(key_null_literal)
+        return self
+
+    def failure_handler_fail(self):
+        """
+        Configures a failure handling strategy in case a request to Elasticsearch fails.
+
+        This strategy throws an exception if a request fails and thus causes a job failure.
+
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.failureHandlerFail()
+        return self
+
+    def failure_handler_ignore(self):
+        """
+        Configures a failure handling strategy in case a request to Elasticsearch fails.
+
+        This strategy ignores failures and drops the request.
+
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.failureHandlerIgnore()
+        return self
+
+    def failure_handler_retry_rejected(self):
+        """
+        Configures a failure handling strategy in case a request to Elasticsearch fails.
+
+        This strategy re-adds requests that have failed due to queue capacity saturation.
+
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.failureHandlerRetryRejected()
+        return self
+
+    def failure_handler_custom(self, failure_handler_class_name):
+        """
+        Configures a failure handling strategy in case a request to Elasticsearch fails.
+
+        This strategy allows for custom failure handling using a ``ActionRequestFailureHandler``.
+
+        :param failure_handler_class_name:
+        :return: This object.
+        """
+        gateway = get_gateway()
+        self._j_elasticsearch = self._j_elasticsearch.failureHandlerCustom(
+            gateway.jvm.Thread.currentThread().getContextClassLoader()
+                   .loadClass(failure_handler_class_name))
+        return self
+
+    def disable_flush_on_checkpoint(self):
+        """
+        Disables flushing on checkpoint. When disabled, a sink will not wait for all pending action
+        requests to be acknowledged by Elasticsearch on checkpoints.
+
+        ..note::
+            If flushing on checkpoint is disabled, a Elasticsearch sink does NOT
+            provide any strong guarantees for at-least-once delivery of action requests.
+
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.disableFlushOnCheckpoint()
+        return self
+
+    def bulk_flush_max_actions(self, max_actions_num):
+        """
+        Configures how to buffer elements before sending them in bulk to the cluster for
+        efficiency.
+
+        Sets the maximum number of actions to buffer for each bulk request.
+
+        :param max_actions_num: the maximum number of actions to buffer per bulk request.
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.bulkFlushMaxActions(int(max_actions_num))
+        return self
+
+    def bulk_flush_max_size(self, max_size):
+        """
+        Configures how to buffer elements before sending them in bulk to the cluster for
+        efficiency.
+
+        Sets the maximum size of buffered actions per bulk request (using the syntax of
+        MemorySize).
+
+        :param max_size: The maximum size. E.g. "42 mb". only MB granularity is supported.
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.bulkFlushMaxSize(max_size)
+        return self
+
+    def bulk_flush_interval(self, interval):
+        """
+        Configures how to buffer elements before sending them in bulk to the cluster for
+        efficiency.
+
+        Sets the bulk flush interval (in milliseconds).
+
+        :param interval: Bulk flush interval (in milliseconds).
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.bulkFlushInterval(int(interval))
+        return self
+
+    def bulk_flush_backoff_constant(self):
+        """
+        Configures how to buffer elements before sending them in bulk to the cluster for
+        efficiency.
+
+        Sets a constant backoff type to use when flushing bulk requests.
+
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffConstant()
+        return self
+
+    def bulk_flush_backoff_exponential(self):
+        """
+        Configures how to buffer elements before sending them in bulk to the cluster for
+        efficiency.
+
+        Sets an exponential backoff type to use when flushing bulk requests.
+
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffExponential()
+        return self
+
+    def bulk_flush_backoff_max_retries(self, max_retries):
+        """
+        Configures how to buffer elements before sending them in bulk to the cluster for
+        efficiency.
+
+        Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
+
+        Make sure to enable backoff by selecting a strategy (
+        :func:`pyflink.table.table_descriptor.Elasticsearch.bulk_flush_backoff_constant` or
+        :func:`pyflink.table.table_descriptor.Elasticsearch.bulk_flush_backoff_exponential`).
+
+        :param max_retries: The maximum number of retries.
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffMaxRetries(int(max_retries))
+        return self
+
+    def bulk_flush_backoff_delay(self, delay):
+        """
+        Configures how to buffer elements before sending them in bulk to the cluster for
+        efficiency.
+
+        Sets the amount of delay between each backoff attempt when flushing bulk requests
+        (in milliseconds).
+
+        Make sure to enable backoff by selecting a strategy (
+        :func:`pyflink.table.table_descriptor.Elasticsearch.bulk_flush_backoff_constant` or
+        :func:`pyflink.table.table_descriptor.Elasticsearch.bulk_flush_backoff_exponential`).
+
+        :param delay: Delay between each backoff attempt (in milliseconds).
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffDelay(int(delay))
+        return self
+
+    def connection_max_retry_timeout(self, max_retry_timeout):
+        """
+        Sets connection properties to be used during REST communication to Elasticsearch.
+
+        Sets the maximum timeout (in milliseconds) in case of multiple retries of the same request.
+
+        :param max_retry_timeout: Maximum timeout (in milliseconds).
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.connectionMaxRetryTimeout(
+            int(max_retry_timeout))
+        return self
+
+    def connection_path_prefix(self, path_prefix):
+        """
+        Sets connection properties to be used during REST communication to Elasticsearch.
+
+        Adds a path prefix to every REST communication.
+
+        :param path_prefix: Prefix string to be added to every REST communication.
+        :return: This object.
+        """
+        self._j_elasticsearch = self._j_elasticsearch.connectionPathPrefix(path_prefix)
+        return self
+
+
 class ConnectTableDescriptor(Descriptor):
     """
     Common class for table's created with :class:`pyflink.table.TableEnvironment.connect`.
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
index 4fcc355..c9370fa 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -17,7 +17,8 @@
 ################################################################################
 import os
 
-from pyflink.table.table_descriptor import (FileSystem, OldCsv, Rowtime, Schema)
+from pyflink.table.table_descriptor import (FileSystem, OldCsv, Rowtime, Schema, Kafka,
+                                            Elasticsearch)
 from pyflink.table.table_sink import CsvTableSink
 from pyflink.table.types import DataTypes
 from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkStreamTableTestCase,
@@ -29,7 +30,7 @@ class FileSystemDescriptorTests(PyFlinkTestCase):
     def test_path(self):
         file_system = FileSystem()
 
-        file_system.path("/test.csv")
+        file_system = file_system.path("/test.csv")
 
         properties = file_system.to_properties()
         expected = {'connector.property-version': '1',
@@ -38,12 +39,392 @@ class FileSystemDescriptorTests(PyFlinkTestCase):
         assert properties == expected
 
 
+class KafkaDescriptorTests(PyFlinkTestCase):
+
+    def test_version(self):
+        kafka = Kafka()
+
+        kafka = kafka.version("0.11")
+
+        properties = kafka.to_properties()
+        expected = {'connector.version': '0.11',
+                    'connector.type': 'kafka',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_topic(self):
+        kafka = Kafka()
+
+        kafka = kafka.topic("topic1")
+
+        properties = kafka.to_properties()
+        expected = {'connector.type': 'kafka',
+                    'connector.topic': 'topic1',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_properties(self):
+        kafka = Kafka()
+
+        kafka = kafka.properties({"zookeeper.connect": "localhost:2181",
+                                  "bootstrap.servers": "localhost:9092"})
+
+        properties = kafka.to_properties()
+        expected = {'connector.type': 'kafka',
+                    'connector.properties.0.key': 'zookeeper.connect',
+                    'connector.properties.0.value': 'localhost:2181',
+                    'connector.properties.1.key': 'bootstrap.servers',
+                    'connector.properties.1.value': 'localhost:9092',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_property(self):
+        kafka = Kafka()
+
+        kafka = kafka.property("group.id", "testGroup")
+
+        properties = kafka.to_properties()
+        expected = {'connector.type': 'kafka',
+                    'connector.properties.0.key': 'group.id',
+                    'connector.properties.0.value': 'testGroup',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_start_from_earliest(self):
+        kafka = Kafka()
+
+        kafka = kafka.start_from_earliest()
+
+        properties = kafka.to_properties()
+        expected = {'connector.type': 'kafka',
+                    'connector.startup-mode': 'earliest-offset',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_start_from_latest(self):
+        kafka = Kafka()
+
+        kafka = kafka.start_from_latest()
+
+        properties = kafka.to_properties()
+        expected = {'connector.type': 'kafka',
+                    'connector.startup-mode': 'latest-offset',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_start_from_group_offsets(self):
+        kafka = Kafka()
+
+        kafka = kafka.start_from_group_offsets()
+
+        properties = kafka.to_properties()
+        expected = {'connector.type': 'kafka',
+                    'connector.startup-mode': 'group-offsets',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_start_from_specific_offsets(self):
+        kafka = Kafka()
+
+        kafka = kafka.start_from_specific_offsets({1: 220, 3: 400})
+
+        properties = kafka.to_properties()
+        expected = {'connector.startup-mode': 'specific-offsets',
+                    'connector.specific-offsets.0.partition': '1',
+                    'connector.specific-offsets.0.offset': '220',
+                    'connector.specific-offsets.1.partition': '3',
+                    'connector.specific-offsets.1.offset': '400',
+                    'connector.type': 'kafka',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_start_from_specific_offset(self):
+        kafka = Kafka()
+
+        kafka = kafka.start_from_specific_offset(3, 300)
+
+        properties = kafka.to_properties()
+        expected = {'connector.startup-mode': 'specific-offsets',
+                    'connector.specific-offsets.0.partition': '3',
+                    'connector.specific-offsets.0.offset': '300',
+                    'connector.type': 'kafka',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_sink_partitioner_fixed(self):
+        kafka = Kafka()
+
+        kafka = kafka.sink_partitioner_fixed()
+
+        properties = kafka.to_properties()
+        expected = {'connector.sink-partitioner': 'fixed',
+                    'connector.type': 'kafka',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_sink_partitioner_custom(self):
+        kafka = Kafka()
+
+        kafka = kafka.sink_partitioner_custom(
+            "org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner")
+
+        properties = kafka.to_properties()
+        expected = {'connector.sink-partitioner': 'custom',
+                    'connector.sink-partitioner-class':
+                        'org.apache.flink.streaming.connectors.kafka.partitioner.'
+                        'FlinkFixedPartitioner',
+                    'connector.type': 'kafka',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_sink_partitioner_round_robin(self):
+        kafka = Kafka()
+
+        kafka = kafka.sink_partitioner_round_robin()
+
+        properties = kafka.to_properties()
+        expected = {'connector.sink-partitioner': 'round-robin',
+                    'connector.type': 'kafka',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+
+class ElasticsearchDescriptorTest(PyFlinkTestCase):
+
+    def test_version(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.version("6")
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.type': 'elasticsearch',
+                    'connector.version': '6',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_host(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.host("localhost", 9200, "http")
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.hosts.0.hostname': 'localhost',
+                    'connector.hosts.0.port': '9200',
+                    'connector.hosts.0.protocol': 'http',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_index(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.index("MyUsers")
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.index': 'MyUsers',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_document_type(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.document_type("user")
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.document-type': 'user',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_key_delimiter(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.key_delimiter("$")
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.key-delimiter': '$',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_key_null_literal(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.key_null_literal("n/a")
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.key-null-literal': 'n/a',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_failure_handler_fail(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.failure_handler_fail()
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.failure-handler': 'fail',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_failure_handler_ignore(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.failure_handler_ignore()
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.failure-handler': 'ignore',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_failure_handler_retry_rejected(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.failure_handler_retry_rejected()
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.failure-handler': 'retry-rejected',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_failure_handler_custom(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.failure_handler_custom(
+            "org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler")
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.failure-handler': 'custom',
+                    'connector.failure-handler-class':
+                        'org.apache.flink.streaming.connectors.elasticsearch.util.'
+                        'IgnoringFailureHandler',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_disable_flush_on_checkpoint(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.disable_flush_on_checkpoint()
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.flush-on-checkpoint': 'false',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_bulk_flush_max_actions(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.bulk_flush_max_actions(42)
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.bulk-flush.max-actions': '42',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_bulk_flush_max_size(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.bulk_flush_max_size("42 mb")
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.bulk-flush.max-size': '44040192 bytes',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+
+        assert properties == expected
+
+    def test_bulk_flush_interval(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.bulk_flush_interval(2000)
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.bulk-flush.interval': '2000',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_bulk_flush_backoff_exponential(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.bulk_flush_backoff_exponential()
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.bulk-flush.backoff.type': 'exponential',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_bulk_flush_backoff_constant(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.bulk_flush_backoff_constant()
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.bulk-flush.backoff.type': 'constant',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_bulk_flush_backoff_max_retries(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.bulk_flush_backoff_max_retries(3)
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.bulk-flush.backoff.max-retries': '3',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_bulk_flush_backoff_delay(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.bulk_flush_backoff_delay(30000)
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.bulk-flush.backoff.delay': '30000',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_connection_max_retry_timeout(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.connection_max_retry_timeout(3000)
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.connection-max-retry-timeout': '3000',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_connection_path_prefix(self):
+        elasticsearch = Elasticsearch()
+
+        elasticsearch = elasticsearch.connection_path_prefix("/v1")
+
+        properties = elasticsearch.to_properties()
+        expected = {'connector.connection-path-prefix': '/v1',
+                    'connector.type': 'elasticsearch',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+
 class OldCsvDescriptorTests(PyFlinkTestCase):
 
     def test_field_delimiter(self):
         csv = OldCsv()
 
-        csv.field_delimiter("|")
+        csv = csv.field_delimiter("|")
 
         properties = csv.to_properties()
         expected = {'format.field-delimiter': '|',
@@ -54,7 +435,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase):
     def test_line_delimiter(self):
         csv = OldCsv()
 
-        csv.line_delimiter(";")
+        csv = csv.line_delimiter(";")
 
         expected = {'format.type': 'csv',
                     'format.property-version': '1',
@@ -66,7 +447,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase):
     def test_ignore_parse_errors(self):
         csv = OldCsv()
 
-        csv.ignore_parse_errors()
+        csv = csv.ignore_parse_errors()
 
         properties = csv.to_properties()
         expected = {'format.ignore-parse-errors': 'true',
@@ -77,7 +458,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase):
     def test_quote_character(self):
         csv = OldCsv()
 
-        csv.quote_character("*")
+        csv = csv.quote_character("*")
 
         properties = csv.to_properties()
         expected = {'format.quote-character': '*',
@@ -88,7 +469,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase):
     def test_comment_prefix(self):
         csv = OldCsv()
 
-        csv.comment_prefix("#")
+        csv = csv.comment_prefix("#")
 
         properties = csv.to_properties()
         expected = {'format.comment-prefix': '#',
@@ -99,7 +480,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase):
     def test_ignore_first_line(self):
         csv = OldCsv()
 
-        csv.ignore_first_line()
+        csv = csv.ignore_first_line()
 
         properties = csv.to_properties()
         expected = {'format.ignore-first-line': 'true',
@@ -363,7 +744,7 @@ class AbstractTableDescriptorTests(object):
     def test_with_format(self):
         descriptor = self.t_env.connect(FileSystem())
 
-        descriptor.with_format(OldCsv().field("a", "INT"))
+        descriptor = descriptor.with_format(OldCsv().field("a", "INT"))
 
         properties = descriptor.to_properties()
 
@@ -378,7 +759,7 @@ class AbstractTableDescriptorTests(object):
     def test_with_schema(self):
         descriptor = self.t_env.connect(FileSystem())
 
-        descriptor.with_format(OldCsv()).with_schema(Schema().field("a", "INT"))
+        descriptor = descriptor.with_format(OldCsv()).with_schema(Schema().field("a", "INT"))
 
         properties = descriptor.to_properties()
         expected = {'schema.0.name': 'a',
@@ -505,7 +886,7 @@ class StreamTableDescriptorTests(PyFlinkStreamTableTestCase, AbstractTableDescri
     def test_in_append_mode(self):
         descriptor = self.t_env.connect(FileSystem())
 
-        descriptor\
+        descriptor = descriptor\
             .with_format(OldCsv())\
             .in_append_mode()
 
@@ -520,7 +901,7 @@ class StreamTableDescriptorTests(PyFlinkStreamTableTestCase, AbstractTableDescri
     def test_in_retract_mode(self):
         descriptor = self.t_env.connect(FileSystem())
 
-        descriptor \
+        descriptor = descriptor \
             .with_format(OldCsv()) \
             .in_retract_mode()
 
@@ -535,7 +916,7 @@ class StreamTableDescriptorTests(PyFlinkStreamTableTestCase, AbstractTableDescri
     def test_in_upsert_mode(self):
         descriptor = self.t_env.connect(FileSystem())
 
-        descriptor \
+        descriptor = descriptor \
             .with_format(OldCsv()) \
             .in_upsert_mode()
 
diff --git a/tools/travis_controller.sh b/tools/travis_controller.sh
index 6741d6a..f19fd8a 100755
--- a/tools/travis_controller.sh
+++ b/tools/travis_controller.sh
@@ -161,6 +161,8 @@ if [ $STAGE == "$STAGE_COMPILE" ]; then
             find "$CACHE_FLINK_DIR" -maxdepth 8 -type f -name '*.jar' \
             ! -path "$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/lib/flink-dist*.jar" \
             ! -path "$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/opt/flink-table*.jar" \
+            ! -path "$CACHE_FLINK_DIR/flink-connectors/flink-connector-elasticsearch-base/target/flink-*.jar" \
+            ! -path "$CACHE_FLINK_DIR/flink-connectors/flink-connector-kafka-base/target/flink-*.jar" \
             ! -path "$CACHE_FLINK_DIR/flink-table/flink-table-planner/target/flink-table-planner*tests.jar" | xargs rm -rf
     
             # .git directory