You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/05/31 03:27:10 UTC
[flink] branch master updated: [FLINK-12440][python] Add all
connector support align Java Table API.
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 836fdff [FLINK-12440][python] Add all connector support align Java Table API.
836fdff is described below
commit 836fdfff0db64ff8241f38e8dd362dd50a9d1895
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Fri May 24 14:45:30 2019 +0800
[FLINK-12440][python] Add all connector support align Java Table API.
This closes #8531
---
.../main/flink-bin/bin/pyflink-gateway-server.sh | 2 +-
flink-python/pyflink/table/__init__.py | 4 +-
flink-python/pyflink/table/table_descriptor.py | 485 ++++++++++++++++++++-
.../pyflink/table/tests/test_descriptor.py | 407 ++++++++++++++++-
tools/travis_controller.sh | 2 +
5 files changed, 883 insertions(+), 17 deletions(-)
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh b/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
index 026f813..9e41ad5 100644
--- a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
+++ b/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
@@ -63,7 +63,7 @@ if [[ -n "$FLINK_TESTING" ]]; then
else
FLINK_TEST_CLASSPATH="$FLINK_TEST_CLASSPATH":"$testJarFile"
fi
- done < <(find "$FLINK_SOURCE_ROOT_DIR" ! -type d -name 'flink-*-tests.jar' -print0 | sort -z)
+ done < <(find "$FLINK_SOURCE_ROOT_DIR" ! -type d \( -name 'flink-*-tests.jar' -o -path "${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-connector-elasticsearch-base/target/flink*.jar" -o -path "${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-connector-kafka-base/target/flink*.jar" \) -print0 | sort -z)
fi
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -cp ${FLINK_CLASSPATH}:${TABLE_JAR_PATH}:${FLINK_TEST_CLASSPATH} ${DRIVER} ${ARGS[@]}
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 281647f..904264e 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -40,7 +40,7 @@ from pyflink.table.table_sink import TableSink, CsvTableSink
from pyflink.table.table_source import TableSource, CsvTableSource
from pyflink.table.types import DataTypes, UserDefinedType, Row
from pyflink.table.window import Tumble, Session, Slide, Over
-from pyflink.table.table_descriptor import Rowtime, Schema, OldCsv, FileSystem
+from pyflink.table.table_descriptor import Rowtime, Schema, OldCsv, FileSystem, Kafka, Elasticsearch
__all__ = [
'TableEnvironment',
@@ -63,4 +63,6 @@ __all__ = [
'FileSystem',
'UserDefinedType',
'Row',
+ 'Kafka',
+ 'Elasticsearch'
]
diff --git a/flink-python/pyflink/table/table_descriptor.py b/flink-python/pyflink/table/table_descriptor.py
index 1dfbde3..65161b4 100644
--- a/flink-python/pyflink/table/table_descriptor.py
+++ b/flink-python/pyflink/table/table_descriptor.py
@@ -30,7 +30,9 @@ __all__ = [
'Rowtime',
'Schema',
'OldCsv',
- 'FileSystem'
+ 'FileSystem',
+ 'Kafka',
+ 'Elasticsearch'
]
@@ -256,7 +258,7 @@ class OldCsv(FormatDescriptor):
format in the dedicated `flink-formats/flink-csv` module instead when writing to Kafka. Use
the old one for stream/batch filesystem operations for now.
- .. note::
+ ..note::
Deprecated: use the RFC-compliant `Csv` format instead when writing to Kafka.
"""
@@ -373,6 +375,485 @@ class FileSystem(ConnectorDescriptor):
return self
+class Kafka(ConnectorDescriptor):
+ """
+ Connector descriptor for the Apache Kafka message queue.
+ """
+
+ def __init__(self):
+ gateway = get_gateway()
+ self._j_kafka = gateway.jvm.Kafka()
+ super(Kafka, self).__init__(self._j_kafka)
+
+ def version(self, version):
+ """
+ Sets the Kafka version to be used.
+
+ :param version: Kafka version. E.g., "0.8", "0.11", etc.
+ :return: This object.
+ """
+ if not isinstance(version, (str, unicode)):
+ version = str(version)
+ self._j_kafka = self._j_kafka.version(version)
+ return self
+
+ def topic(self, topic):
+ """
+ Sets the topic from which the table is read.
+
+ :param topic: The topic from which the table is read.
+ :return: This object.
+ """
+ self._j_kafka = self._j_kafka.topic(topic)
+ return self
+
+ def properties(self, property_dict):
+ """
+ Sets the configuration properties for the Kafka consumer. Resets previously set properties.
+
+ :param property_dict: The dict object contains configuration properties for the Kafka
+ consumer. Both the keys and values should be strings.
+ :return: This object.
+ """
+ gateway = get_gateway()
+ properties = gateway.jvm.java.util.Properties()
+ for key in property_dict:
+ properties.setProperty(key, property_dict[key])
+ self._j_kafka = self._j_kafka.properties(properties)
+ return self
+
+ def property(self, key, value):
+ """
+ Adds a configuration properties for the Kafka consumer.
+
+ :param key: Property key string for the Kafka consumer.
+ :param value: Property value string for the Kafka consumer.
+ :return: This object.
+ """
+ self._j_kafka = self._j_kafka.property(key, value)
+ return self
+
+ def start_from_earliest(self):
+ """
+ Specifies the consumer to start reading from the earliest offset for all partitions.
+ This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+
+ This method does not affect where partitions are read from when the consumer is restored
+ from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+ savepoint, only the offsets in the restored state will be used.
+
+ :return: This object.
+ """
+ self._j_kafka = self._j_kafka.startFromEarliest()
+ return self
+
+ def start_from_latest(self):
+ """
+ Specifies the consumer to start reading from the latest offset for all partitions.
+ This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+
+ This method does not affect where partitions are read from when the consumer is restored
+ from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+ savepoint, only the offsets in the restored state will be used.
+
+ :return: This object.
+ """
+ self._j_kafka = self._j_kafka.startFromLatest()
+ return self
+
+ def start_from_group_offsets(self):
+ """
+ Specifies the consumer to start reading from any committed group offsets found
+ in Zookeeper / Kafka brokers. The "group.id" property must be set in the configuration
+ properties. If no offset can be found for a partition, the behaviour in "auto.offset.reset"
+ set in the configuration properties will be used for the partition.
+
+ This method does not affect where partitions are read from when the consumer is restored
+ from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+ savepoint, only the offsets in the restored state will be used.
+
+ :return: This object.
+ """
+ self._j_kafka = self._j_kafka.startFromGroupOffsets()
+ return self
+
+ def start_from_specific_offsets(self, specific_offsets_dict):
+ """
+ Specifies the consumer to start reading partitions from specific offsets, set independently
+ for each partition. The specified offset should be the offset of the next record that will
+ be read from partitions. This lets the consumer ignore any committed group offsets in
+ Zookeeper / Kafka brokers.
+
+ If the provided map of offsets contains entries whose partition is not subscribed by the
+ consumer, the entry will be ignored. If the consumer subscribes to a partition that does
+ not exist in the provided map of offsets, the consumer will fallback to the default group
+ offset behaviour(see :func:`pyflink.table.table_descriptor.Kafka.start_from_group_offsets`)
+ for that particular partition.
+
+ If the specified offset for a partition is invalid, or the behaviour for that partition is
+ defaulted to group offsets but still no group offset could be found for it, then the
+ "auto.offset.reset" behaviour set in the configuration properties will be used for the
+ partition.
+
+ This method does not affect where partitions are read from when the consumer is restored
+ from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+ savepoint, only the offsets in the restored state will be used.
+
+ :param specific_offsets_dict: Dict of specific_offsets that the key is int-type partition
+ id and value is int-type offset value.
+ :return: This object.
+ """
+ for key in specific_offsets_dict:
+ self.start_from_specific_offset(key, specific_offsets_dict[key])
+ return self
+
+ def start_from_specific_offset(self, partition, specific_offset):
+ """
+ Configures to start reading partitions from specific offsets and specifies the given offset
+ for the given partition.
+
+ see :func:`pyflink.table.table_descriptor.Kafka.start_from_specific_offsets`
+
+ :param partition:
+ :param specific_offset:
+ :return: This object.
+ """
+ self._j_kafka = self._j_kafka.startFromSpecificOffset(int(partition), int(specific_offset))
+ return self
+
+ def sink_partitioner_fixed(self):
+ """
+ Configures how to partition records from Flink's partitions into Kafka's partitions.
+
+ This strategy ensures that each Flink partition ends up in one Kafka partition.
+
+ ..note::
+ One Kafka partition can contain multiple Flink partitions. Examples:
+
+ More Flink partitions than Kafka partitions. Some (or all) Kafka partitions contain
+ the output of more than one flink partition:
+
+ | Flink Sinks --------- Kafka Partitions
+ | 1 ----------------> 1
+ | 2 --------------/
+ | 3 -------------/
+ | 4 ------------/
+
+ Fewer Flink partitions than Kafka partitions:
+
+ | Flink Sinks --------- Kafka Partitions
+ | 1 ----------------> 1
+ | 2 ----------------> 2
+ | 3
+ | 4
+ | 5
+
+ :return: This object.
+ """
+ self._j_kafka = self._j_kafka.sinkPartitionerFixed()
+ return self
+
+ def sink_partitioner_round_robin(self):
+ """
+ Configures how to partition records from Flink's partitions into Kafka's partitions.
+
+ This strategy ensures that records will be distributed to Kafka partitions in a
+ round-robin fashion.
+
+ ..note::
+ This strategy is useful to avoid an unbalanced partitioning. However, it will cause a
+ lot of network connections between all the Flink instances and all the Kafka brokers.
+
+ :return: This object.
+ """
+ self._j_kafka = self._j_kafka.sinkPartitionerRoundRobin()
+ return self
+
+ def sink_partitioner_custom(self, partitioner_class_name):
+ """
+ Configures how to partition records from Flink's partitions into Kafka's partitions.
+
+ This strategy allows for a custom partitioner by providing an implementation
+ of ``FlinkKafkaPartitioner``.
+
+ :param partitioner_class_name: The java canonical class name of the FlinkKafkaPartitioner.
+ The FlinkKafkaPartitioner must have a public no-argument
+ constructor and can be founded by in current Java
+ classloader.
+ :return: This object.
+ """
+ gateway = get_gateway()
+ self._j_kafka = self._j_kafka.sinkPartitionerCustom(
+ gateway.jvm.Thread.currentThread().getContextClassLoader()
+ .loadClass(partitioner_class_name))
+ return self
+
+
+class Elasticsearch(ConnectorDescriptor):
+ """
+ Connector descriptor for the Elasticsearch search engine.
+ """
+
+ def __init__(self):
+ gateway = get_gateway()
+ self._j_elasticsearch = gateway.jvm.Elasticsearch()
+ super(Elasticsearch, self).__init__(self._j_elasticsearch)
+
+ def version(self, version):
+ """
+ Sets the Elasticsearch version to be used. Required.
+
+ :param version: Elasticsearch version. E.g., "6".
+ :return: This object.
+ """
+ if not isinstance(version, (str, unicode)):
+ version = str(version)
+ self._j_elasticsearch = self._j_elasticsearch.version(version)
+ return self
+
+ def host(self, hostname, port, protocol):
+ """
+ Adds an Elasticsearch host to connect to. Required.
+
+ Multiple hosts can be declared by calling this method multiple times.
+
+ :param hostname: Connection hostname.
+ :param port: Connection port.
+ :param protocol: Connection protocol; e.g. "http".
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.host(hostname, int(port), protocol)
+ return self
+
+ def index(self, index):
+ """
+ Declares the Elasticsearch index for every record. Required.
+
+ :param index: Elasticsearch index.
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.index(index)
+ return self
+
+ def document_type(self, document_type):
+ """
+ Declares the Elasticsearch document type for every record. Required.
+
+ :param document_type: Elasticsearch document type.
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.documentType(document_type)
+ return self
+
+ def key_delimiter(self, key_delimiter):
+ """
+ Sets a custom key delimiter in case the Elasticsearch ID needs to be constructed from
+ multiple fields. Optional.
+
+ :param key_delimiter: Key delimiter; e.g., "$" would result in IDs "KEY1$KEY2$KEY3".
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.keyDelimiter(key_delimiter)
+ return self
+
+ def key_null_literal(self, key_null_literal):
+ """
+ Sets a custom representation for null fields in keys. Optional.
+
+ :param key_null_literal: key null literal string; e.g. "N/A" would result in IDs
+ "KEY1_N/A_KEY3".
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.keyNullLiteral(key_null_literal)
+ return self
+
+ def failure_handler_fail(self):
+ """
+ Configures a failure handling strategy in case a request to Elasticsearch fails.
+
+ This strategy throws an exception if a request fails and thus causes a job failure.
+
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.failureHandlerFail()
+ return self
+
+ def failure_handler_ignore(self):
+ """
+ Configures a failure handling strategy in case a request to Elasticsearch fails.
+
+ This strategy ignores failures and drops the request.
+
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.failureHandlerIgnore()
+ return self
+
+ def failure_handler_retry_rejected(self):
+ """
+ Configures a failure handling strategy in case a request to Elasticsearch fails.
+
+ This strategy re-adds requests that have failed due to queue capacity saturation.
+
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.failureHandlerRetryRejected()
+ return self
+
+ def failure_handler_custom(self, failure_handler_class_name):
+ """
+ Configures a failure handling strategy in case a request to Elasticsearch fails.
+
+ This strategy allows for custom failure handling using a ``ActionRequestFailureHandler``.
+
+ :param failure_handler_class_name:
+ :return: This object.
+ """
+ gateway = get_gateway()
+ self._j_elasticsearch = self._j_elasticsearch.failureHandlerCustom(
+ gateway.jvm.Thread.currentThread().getContextClassLoader()
+ .loadClass(failure_handler_class_name))
+ return self
+
+ def disable_flush_on_checkpoint(self):
+ """
+ Disables flushing on checkpoint. When disabled, a sink will not wait for all pending action
+ requests to be acknowledged by Elasticsearch on checkpoints.
+
+ ..note::
+ If flushing on checkpoint is disabled, a Elasticsearch sink does NOT
+ provide any strong guarantees for at-least-once delivery of action requests.
+
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.disableFlushOnCheckpoint()
+ return self
+
+ def bulk_flush_max_actions(self, max_actions_num):
+ """
+ Configures how to buffer elements before sending them in bulk to the cluster for
+ efficiency.
+
+ Sets the maximum number of actions to buffer for each bulk request.
+
+ :param max_actions_num: the maximum number of actions to buffer per bulk request.
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.bulkFlushMaxActions(int(max_actions_num))
+ return self
+
+ def bulk_flush_max_size(self, max_size):
+ """
+ Configures how to buffer elements before sending them in bulk to the cluster for
+ efficiency.
+
+ Sets the maximum size of buffered actions per bulk request (using the syntax of
+ MemorySize).
+
+ :param max_size: The maximum size. E.g. "42 mb". only MB granularity is supported.
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.bulkFlushMaxSize(max_size)
+ return self
+
+ def bulk_flush_interval(self, interval):
+ """
+ Configures how to buffer elements before sending them in bulk to the cluster for
+ efficiency.
+
+ Sets the bulk flush interval (in milliseconds).
+
+ :param interval: Bulk flush interval (in milliseconds).
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.bulkFlushInterval(int(interval))
+ return self
+
+ def bulk_flush_backoff_constant(self):
+ """
+ Configures how to buffer elements before sending them in bulk to the cluster for
+ efficiency.
+
+ Sets a constant backoff type to use when flushing bulk requests.
+
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffConstant()
+ return self
+
+ def bulk_flush_backoff_exponential(self):
+ """
+ Configures how to buffer elements before sending them in bulk to the cluster for
+ efficiency.
+
+ Sets an exponential backoff type to use when flushing bulk requests.
+
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffExponential()
+ return self
+
+ def bulk_flush_backoff_max_retries(self, max_retries):
+ """
+ Configures how to buffer elements before sending them in bulk to the cluster for
+ efficiency.
+
+ Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
+
+ Make sure to enable backoff by selecting a strategy (
+ :func:`pyflink.table.table_descriptor.Elasticsearch.bulk_flush_backoff_constant` or
+ :func:`pyflink.table.table_descriptor.Elasticsearch.bulk_flush_backoff_exponential`).
+
+ :param max_retries: The maximum number of retries.
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffMaxRetries(int(max_retries))
+ return self
+
+ def bulk_flush_backoff_delay(self, delay):
+ """
+ Configures how to buffer elements before sending them in bulk to the cluster for
+ efficiency.
+
+ Sets the amount of delay between each backoff attempt when flushing bulk requests
+ (in milliseconds).
+
+ Make sure to enable backoff by selecting a strategy (
+ :func:`pyflink.table.table_descriptor.Elasticsearch.bulk_flush_backoff_constant` or
+ :func:`pyflink.table.table_descriptor.Elasticsearch.bulk_flush_backoff_exponential`).
+
+ :param delay: Delay between each backoff attempt (in milliseconds).
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffDelay(int(delay))
+ return self
+
+ def connection_max_retry_timeout(self, max_retry_timeout):
+ """
+ Sets connection properties to be used during REST communication to Elasticsearch.
+
+ Sets the maximum timeout (in milliseconds) in case of multiple retries of the same request.
+
+ :param max_retry_timeout: Maximum timeout (in milliseconds).
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.connectionMaxRetryTimeout(
+ int(max_retry_timeout))
+ return self
+
+ def connection_path_prefix(self, path_prefix):
+ """
+ Sets connection properties to be used during REST communication to Elasticsearch.
+
+ Adds a path prefix to every REST communication.
+
+ :param path_prefix: Prefix string to be added to every REST communication.
+ :return: This object.
+ """
+ self._j_elasticsearch = self._j_elasticsearch.connectionPathPrefix(path_prefix)
+ return self
+
+
class ConnectTableDescriptor(Descriptor):
"""
Common class for table's created with :class:`pyflink.table.TableEnvironment.connect`.
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
index 4fcc355..c9370fa 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -17,7 +17,8 @@
################################################################################
import os
-from pyflink.table.table_descriptor import (FileSystem, OldCsv, Rowtime, Schema)
+from pyflink.table.table_descriptor import (FileSystem, OldCsv, Rowtime, Schema, Kafka,
+ Elasticsearch)
from pyflink.table.table_sink import CsvTableSink
from pyflink.table.types import DataTypes
from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkStreamTableTestCase,
@@ -29,7 +30,7 @@ class FileSystemDescriptorTests(PyFlinkTestCase):
def test_path(self):
file_system = FileSystem()
- file_system.path("/test.csv")
+ file_system = file_system.path("/test.csv")
properties = file_system.to_properties()
expected = {'connector.property-version': '1',
@@ -38,12 +39,392 @@ class FileSystemDescriptorTests(PyFlinkTestCase):
assert properties == expected
+class KafkaDescriptorTests(PyFlinkTestCase):
+
+ def test_version(self):
+ kafka = Kafka()
+
+ kafka = kafka.version("0.11")
+
+ properties = kafka.to_properties()
+ expected = {'connector.version': '0.11',
+ 'connector.type': 'kafka',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_topic(self):
+ kafka = Kafka()
+
+ kafka = kafka.topic("topic1")
+
+ properties = kafka.to_properties()
+ expected = {'connector.type': 'kafka',
+ 'connector.topic': 'topic1',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_properties(self):
+ kafka = Kafka()
+
+ kafka = kafka.properties({"zookeeper.connect": "localhost:2181",
+ "bootstrap.servers": "localhost:9092"})
+
+ properties = kafka.to_properties()
+ expected = {'connector.type': 'kafka',
+ 'connector.properties.0.key': 'zookeeper.connect',
+ 'connector.properties.0.value': 'localhost:2181',
+ 'connector.properties.1.key': 'bootstrap.servers',
+ 'connector.properties.1.value': 'localhost:9092',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_property(self):
+ kafka = Kafka()
+
+ kafka = kafka.property("group.id", "testGroup")
+
+ properties = kafka.to_properties()
+ expected = {'connector.type': 'kafka',
+ 'connector.properties.0.key': 'group.id',
+ 'connector.properties.0.value': 'testGroup',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_start_from_earliest(self):
+ kafka = Kafka()
+
+ kafka = kafka.start_from_earliest()
+
+ properties = kafka.to_properties()
+ expected = {'connector.type': 'kafka',
+ 'connector.startup-mode': 'earliest-offset',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_start_from_latest(self):
+ kafka = Kafka()
+
+ kafka = kafka.start_from_latest()
+
+ properties = kafka.to_properties()
+ expected = {'connector.type': 'kafka',
+ 'connector.startup-mode': 'latest-offset',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_start_from_group_offsets(self):
+ kafka = Kafka()
+
+ kafka = kafka.start_from_group_offsets()
+
+ properties = kafka.to_properties()
+ expected = {'connector.type': 'kafka',
+ 'connector.startup-mode': 'group-offsets',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_start_from_specific_offsets(self):
+ kafka = Kafka()
+
+ kafka = kafka.start_from_specific_offsets({1: 220, 3: 400})
+
+ properties = kafka.to_properties()
+ expected = {'connector.startup-mode': 'specific-offsets',
+ 'connector.specific-offsets.0.partition': '1',
+ 'connector.specific-offsets.0.offset': '220',
+ 'connector.specific-offsets.1.partition': '3',
+ 'connector.specific-offsets.1.offset': '400',
+ 'connector.type': 'kafka',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_start_from_specific_offset(self):
+ kafka = Kafka()
+
+ kafka = kafka.start_from_specific_offset(3, 300)
+
+ properties = kafka.to_properties()
+ expected = {'connector.startup-mode': 'specific-offsets',
+ 'connector.specific-offsets.0.partition': '3',
+ 'connector.specific-offsets.0.offset': '300',
+ 'connector.type': 'kafka',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_sink_partitioner_fixed(self):
+ kafka = Kafka()
+
+ kafka = kafka.sink_partitioner_fixed()
+
+ properties = kafka.to_properties()
+ expected = {'connector.sink-partitioner': 'fixed',
+ 'connector.type': 'kafka',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_sink_partitioner_custom(self):
+ kafka = Kafka()
+
+ kafka = kafka.sink_partitioner_custom(
+ "org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner")
+
+ properties = kafka.to_properties()
+ expected = {'connector.sink-partitioner': 'custom',
+ 'connector.sink-partitioner-class':
+ 'org.apache.flink.streaming.connectors.kafka.partitioner.'
+ 'FlinkFixedPartitioner',
+ 'connector.type': 'kafka',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_sink_partitioner_round_robin(self):
+ kafka = Kafka()
+
+ kafka = kafka.sink_partitioner_round_robin()
+
+ properties = kafka.to_properties()
+ expected = {'connector.sink-partitioner': 'round-robin',
+ 'connector.type': 'kafka',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+
+class ElasticsearchDescriptorTest(PyFlinkTestCase):
+
+ def test_version(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.version("6")
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.type': 'elasticsearch',
+ 'connector.version': '6',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_host(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.host("localhost", 9200, "http")
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.hosts.0.hostname': 'localhost',
+ 'connector.hosts.0.port': '9200',
+ 'connector.hosts.0.protocol': 'http',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_index(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.index("MyUsers")
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.index': 'MyUsers',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_document_type(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.document_type("user")
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.document-type': 'user',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_key_delimiter(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.key_delimiter("$")
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.key-delimiter': '$',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_key_null_literal(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.key_null_literal("n/a")
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.key-null-literal': 'n/a',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_failure_handler_fail(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.failure_handler_fail()
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.failure-handler': 'fail',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_failure_handler_ignore(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.failure_handler_ignore()
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.failure-handler': 'ignore',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_failure_handler_retry_rejected(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.failure_handler_retry_rejected()
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.failure-handler': 'retry-rejected',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_failure_handler_custom(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.failure_handler_custom(
+ "org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler")
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.failure-handler': 'custom',
+ 'connector.failure-handler-class':
+ 'org.apache.flink.streaming.connectors.elasticsearch.util.'
+ 'IgnoringFailureHandler',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_disable_flush_on_checkpoint(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.disable_flush_on_checkpoint()
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.flush-on-checkpoint': 'false',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_bulk_flush_max_actions(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.bulk_flush_max_actions(42)
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.bulk-flush.max-actions': '42',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_bulk_flush_max_size(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.bulk_flush_max_size("42 mb")
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.bulk-flush.max-size': '44040192 bytes',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+
+ assert properties == expected
+
+ def test_bulk_flush_interval(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.bulk_flush_interval(2000)
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.bulk-flush.interval': '2000',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_bulk_flush_backoff_exponential(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.bulk_flush_backoff_exponential()
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.bulk-flush.backoff.type': 'exponential',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_bulk_flush_backoff_constant(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.bulk_flush_backoff_constant()
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.bulk-flush.backoff.type': 'constant',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_bulk_flush_backoff_max_retries(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.bulk_flush_backoff_max_retries(3)
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.bulk-flush.backoff.max-retries': '3',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_bulk_flush_backoff_delay(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.bulk_flush_backoff_delay(30000)
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.bulk-flush.backoff.delay': '30000',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_connection_max_retry_timeout(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.connection_max_retry_timeout(3000)
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.connection-max-retry-timeout': '3000',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_connection_path_prefix(self):
+ elasticsearch = Elasticsearch()
+
+ elasticsearch = elasticsearch.connection_path_prefix("/v1")
+
+ properties = elasticsearch.to_properties()
+ expected = {'connector.connection-path-prefix': '/v1',
+ 'connector.type': 'elasticsearch',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+
class OldCsvDescriptorTests(PyFlinkTestCase):
def test_field_delimiter(self):
csv = OldCsv()
- csv.field_delimiter("|")
+ csv = csv.field_delimiter("|")
properties = csv.to_properties()
expected = {'format.field-delimiter': '|',
@@ -54,7 +435,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase):
def test_line_delimiter(self):
csv = OldCsv()
- csv.line_delimiter(";")
+ csv = csv.line_delimiter(";")
expected = {'format.type': 'csv',
'format.property-version': '1',
@@ -66,7 +447,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase):
def test_ignore_parse_errors(self):
csv = OldCsv()
- csv.ignore_parse_errors()
+ csv = csv.ignore_parse_errors()
properties = csv.to_properties()
expected = {'format.ignore-parse-errors': 'true',
@@ -77,7 +458,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase):
def test_quote_character(self):
csv = OldCsv()
- csv.quote_character("*")
+ csv = csv.quote_character("*")
properties = csv.to_properties()
expected = {'format.quote-character': '*',
@@ -88,7 +469,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase):
def test_comment_prefix(self):
csv = OldCsv()
- csv.comment_prefix("#")
+ csv = csv.comment_prefix("#")
properties = csv.to_properties()
expected = {'format.comment-prefix': '#',
@@ -99,7 +480,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase):
def test_ignore_first_line(self):
csv = OldCsv()
- csv.ignore_first_line()
+ csv = csv.ignore_first_line()
properties = csv.to_properties()
expected = {'format.ignore-first-line': 'true',
@@ -363,7 +744,7 @@ class AbstractTableDescriptorTests(object):
def test_with_format(self):
descriptor = self.t_env.connect(FileSystem())
- descriptor.with_format(OldCsv().field("a", "INT"))
+ descriptor = descriptor.with_format(OldCsv().field("a", "INT"))
properties = descriptor.to_properties()
@@ -378,7 +759,7 @@ class AbstractTableDescriptorTests(object):
def test_with_schema(self):
descriptor = self.t_env.connect(FileSystem())
- descriptor.with_format(OldCsv()).with_schema(Schema().field("a", "INT"))
+ descriptor = descriptor.with_format(OldCsv()).with_schema(Schema().field("a", "INT"))
properties = descriptor.to_properties()
expected = {'schema.0.name': 'a',
@@ -505,7 +886,7 @@ class StreamTableDescriptorTests(PyFlinkStreamTableTestCase, AbstractTableDescri
def test_in_append_mode(self):
descriptor = self.t_env.connect(FileSystem())
- descriptor\
+ descriptor = descriptor\
.with_format(OldCsv())\
.in_append_mode()
@@ -520,7 +901,7 @@ class StreamTableDescriptorTests(PyFlinkStreamTableTestCase, AbstractTableDescri
def test_in_retract_mode(self):
descriptor = self.t_env.connect(FileSystem())
- descriptor \
+ descriptor = descriptor \
.with_format(OldCsv()) \
.in_retract_mode()
@@ -535,7 +916,7 @@ class StreamTableDescriptorTests(PyFlinkStreamTableTestCase, AbstractTableDescri
def test_in_upsert_mode(self):
descriptor = self.t_env.connect(FileSystem())
- descriptor \
+ descriptor = descriptor \
.with_format(OldCsv()) \
.in_upsert_mode()
diff --git a/tools/travis_controller.sh b/tools/travis_controller.sh
index 6741d6a..f19fd8a 100755
--- a/tools/travis_controller.sh
+++ b/tools/travis_controller.sh
@@ -161,6 +161,8 @@ if [ $STAGE == "$STAGE_COMPILE" ]; then
find "$CACHE_FLINK_DIR" -maxdepth 8 -type f -name '*.jar' \
! -path "$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/lib/flink-dist*.jar" \
! -path "$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/opt/flink-table*.jar" \
+ ! -path "$CACHE_FLINK_DIR/flink-connectors/flink-connector-elasticsearch-base/target/flink-*.jar" \
+ ! -path "$CACHE_FLINK_DIR/flink-connectors/flink-connector-kafka-base/target/flink-*.jar" \
! -path "$CACHE_FLINK_DIR/flink-table/flink-table-planner/target/flink-table-planner*tests.jar" | xargs rm -rf
# .git directory