You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:03 UTC
[13/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-flume/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-flume/pom.xml b/flink-streaming-connectors/flink-connector-flume/pom.xml
deleted file mode 100644
index 1b1b810..0000000
--- a/flink-streaming-connectors/flink-connector-flume/pom.xml
+++ /dev/null
@@ -1,175 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-connectors</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-flume_2.10</artifactId>
- <name>flink-connector-flume</name>
-
- <packaging>jar</packaging>
-
- <!-- Allow users to pass custom connector versions -->
- <properties>
- <flume-ng.version>1.5.0</flume-ng.version>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- <version>${flume-ng.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.thoughtworks.paranamer</groupId>
- <artifactId>paranamer</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.tukaani</groupId>
- <artifactId>xz</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.velocity</groupId>
- <artifactId>velocity</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <id>shade-flink</id>
- <configuration>
- <artifactSet>
- <includes combine.children="append">
- <!-- We include all dependencies that transitively depend on guava -->
- <include>org.apache.flume:*</include>
- </includes>
- </artifactSet>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
deleted file mode 100644
index 2dc043b..0000000
--- a/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.flume;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.flume.event.EventBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FlumeSink<IN> extends RichSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
-
- private transient FlinkRpcClientFacade client;
- boolean initDone = false;
- String host;
- int port;
- SerializationSchema<IN> schema;
-
- public FlumeSink(String host, int port, SerializationSchema<IN> schema) {
- this.host = host;
- this.port = port;
- this.schema = schema;
- }
-
- /**
- * Receives tuples from the Apache Flink {@link DataStream} and forwards
- * them to Apache Flume.
- *
- * @param value
- * The tuple arriving from the datastream
- */
- @Override
- public void invoke(IN value) {
-
- byte[] data = schema.serialize(value);
- client.sendDataToFlume(data);
-
- }
-
- private class FlinkRpcClientFacade {
- private RpcClient client;
- private String hostname;
- private int port;
-
- /**
- * Initializes the connection to Apache Flume.
- *
- * @param hostname
- * The host
- * @param port
- * The port.
- */
- public void init(String hostname, int port) {
- // Setup the RPC connection
- this.hostname = hostname;
- this.port = port;
- int initCounter = 0;
- while (true) {
- if (initCounter >= 90) {
- throw new RuntimeException("Cannot establish connection with" + port + " at "
- + host);
- }
- try {
- this.client = RpcClientFactory.getDefaultInstance(hostname, port);
- } catch (FlumeException e) {
- // Wait one second if the connection failed before the next
- // try
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Interrupted while trying to connect {} at {}", port, host);
- }
- }
- }
- if (client != null) {
- break;
- }
- initCounter++;
- }
- initDone = true;
- }
-
- /**
- * Sends byte arrays as {@link Event} series to Apache Flume.
- *
- * @param data
- * The byte array to send to Apache FLume
- */
- public void sendDataToFlume(byte[] data) {
- Event event = EventBuilder.withBody(data);
-
- try {
- client.append(event);
-
- } catch (EventDeliveryException e) {
- // clean up and recreate the client
- client.close();
- client = null;
- client = RpcClientFactory.getDefaultInstance(hostname, port);
- }
- }
-
- }
-
- @Override
- public void close() {
- client.client.close();
- }
-
- @Override
- public void open(Configuration config) {
- client = new FlinkRpcClientFacade();
- client.init(host, port);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
deleted file mode 100644
index 04019f8..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
+++ /dev/null
@@ -1,205 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-connectors</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
- <name>flink-connector-kafka-0.10</name>
-
- <packaging>jar</packaging>
-
- <!-- Allow users to pass custom connector versions -->
- <properties>
- <kafka.version>0.10.0.1</kafka.version>
- </properties>
-
- <dependencies>
-
- <!-- core dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!-- Add Kafka 0.10.x as a dependency -->
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${kafka.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table_2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- <!-- Projects depending on this project,
- won't depend on flink-table. -->
- <optional>true</optional>
- </dependency>
-
- <!-- test dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <!-- exclude Kafka dependencies -->
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- </exclusion>
- </exclusions>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-base_2.10</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <!-- exclude Kafka dependencies -->
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- </exclusion>
- </exclusions>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <!-- include 0.10 server for tests -->
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- <version>${kafka.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests_2.10</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_2.10</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-metrics-jmx</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- <configuration>
- <includes>
- <include>**/KafkaTestEnvironmentImpl*</include>
- </includes>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <executions>
- <execution>
- <id>attach-test-sources</id>
- <goals>
- <goal>test-jar-no-fork</goal>
- </goals>
- <configuration>
- <includes>
- <include>**/KafkaTestEnvironmentImpl*</include>
- </includes>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
- <forkCount>1</forkCount>
- <argLine>-Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
deleted file mode 100644
index a9ce336..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.util.SerializedValue;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-
-/**
- * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
- * Apache Kafka 0.10.x. The consumer can run in multiple parallel instances, each of which will pull
- * data from one or more Kafka partitions.
- *
- * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
- * during a failure, and that the computation processes elements "exactly once".
- * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
- *
- * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
- * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view
- * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer
- * has consumed a topic.</p>
- *
- * <p>Please refer to Kafka's documentation for the available configuration properties:
- * http://kafka.apache.org/documentation.html#newconsumerconfigs</p>
- *
- * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
- * is constructed. That means that the client that submits the program needs to be able to
- * reach the Kafka brokers or ZooKeeper.</p>
- */
-public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
-
- private static final long serialVersionUID = 2324564345203409112L;
-
-
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new Kafka streaming source consumer for Kafka 0.10.x
- *
- * @param topic
- * The name of the topic that should be consumed.
- * @param valueDeserializer
- * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
- * @param props
- * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
- */
- public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
- this(Collections.singletonList(topic), valueDeserializer, props);
- }
-
- /**
- * Creates a new Kafka streaming source consumer for Kafka 0.10.x
- *
- * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
- * pairs, offsets, and topic names from Kafka.
- *
- * @param topic
- * The name of the topic that should be consumed.
- * @param deserializer
- * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
- * @param props
- * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
- */
- public FlinkKafkaConsumer010(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
- this(Collections.singletonList(topic), deserializer, props);
- }
-
- /**
- * Creates a new Kafka streaming source consumer for Kafka 0.10.x
- *
- * This constructor allows passing multiple topics to the consumer.
- *
- * @param topics
- * The Kafka topics to read from.
- * @param deserializer
- * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
- * @param props
- * The properties that are used to configure both the fetcher and the offset handler.
- */
- public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
- this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props);
- }
-
- /**
- * Creates a new Kafka streaming source consumer for Kafka 0.10.x
- *
- * This constructor allows passing multiple topics and a key/value deserialization schema.
- *
- * @param topics
- * The Kafka topics to read from.
- * @param deserializer
- * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
- * @param props
- * The properties that are used to configure both the fetcher and the offset handler.
- */
- public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
- super(topics, deserializer, props);
- }
-
- @Override
- protected AbstractFetcher<T, ?> createFetcher(
- SourceContext<T> sourceContext,
- List<KafkaTopicPartition> thisSubtaskPartitions,
- SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
- SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- StreamingRuntimeContext runtimeContext) throws Exception {
-
- boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
-
- return new Kafka010Fetcher<>(
- sourceContext,
- thisSubtaskPartitions,
- watermarksPeriodic,
- watermarksPunctuated,
- runtimeContext.getProcessingTimeService(),
- runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
- runtimeContext.getUserCodeClassLoader(),
- runtimeContext.isCheckpointingEnabled(),
- runtimeContext.getTaskNameWithSubtasks(),
- runtimeContext.getMetricGroup(),
- deserializer,
- properties,
- pollTimeout,
- useMetrics);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
deleted file mode 100644
index cc0194b..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-import java.util.Properties;
-
-import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
-
-
-/**
- * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x
- *
- * Implementation note: This producer is a hybrid between a regular regular sink function (a)
- * and a custom operator (b).
- *
- * For (a), the class implements the SinkFunction and RichFunction interfaces.
- * For (b), it extends the StreamTask class.
- *
- * Details about approach (a):
- *
- * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the
- * DataStream.addSink() method.
- * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record
- * the Kafka 0.10 producer has a second invocation option, approach (b).
- *
- * Details about approach (b):
- * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the
- * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer
- * can access the internal record timestamp of the record and write it to Kafka.
- *
- * All methods and constructors in this class are marked with the approach they are needed for.
- */
-public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction {
-
- /**
- * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
- */
- private boolean writeTimestampToKafka = false;
-
- // ---------------------- "Constructors" for timestamp writing ------------------
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
- * the topic.
- *
- * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
- *
- * @param inStream The stream to write to Kafka
- * @param topicId ID of the Kafka topic.
- * @param serializationSchema User defined serialization schema supporting key/value messages
- * @param producerConfig Properties with the producer configuration.
- */
- public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
- String topicId,
- KeyedSerializationSchema<T> serializationSchema,
- Properties producerConfig) {
- return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FixedPartitioner<T>());
- }
-
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
- * the topic.
- *
- * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
- *
- * @param inStream The stream to write to Kafka
- * @param topicId ID of the Kafka topic.
- * @param serializationSchema User defined (keyless) serialization schema.
- * @param producerConfig Properties with the producer configuration.
- */
- public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
- String topicId,
- SerializationSchema<T> serializationSchema,
- Properties producerConfig) {
- return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>());
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
- * the topic.
- *
- * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
- *
- * @param inStream The stream to write to Kafka
- * @param topicId The name of the target topic
- * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
- */
- public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
- String topicId,
- KeyedSerializationSchema<T> serializationSchema,
- Properties producerConfig,
- KafkaPartitioner<T> customPartitioner) {
-
- GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
- FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
- SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
- return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
- }
-
- // ---------------------- Regular constructors w/o timestamp support ------------------
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
- * the topic.
- *
- * @param brokerList
- * Comma separated addresses of the brokers
- * @param topicId
- * ID of the Kafka topic.
- * @param serializationSchema
- * User defined (keyless) serialization schema.
- */
- public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
- * the topic.
- *
- * @param topicId
- * ID of the Kafka topic.
- * @param serializationSchema
- * User defined (keyless) serialization schema.
- * @param producerConfig
- * Properties with the producer configuration.
- */
- public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>());
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
- * the topic.
- *
- * @param topicId The topic to write data to
- * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
- */
- public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
- }
-
- // ------------------- Key/Value serialization schema constructors ----------------------
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
- * the topic.
- *
- * @param brokerList
- * Comma separated addresses of the brokers
- * @param topicId
- * ID of the Kafka topic.
- * @param serializationSchema
- * User defined serialization schema supporting key/value messages
- */
- public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) {
- this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
- * the topic.
- *
- * @param topicId
- * ID of the Kafka topic.
- * @param serializationSchema
- * User defined serialization schema supporting key/value messages
- * @param producerConfig
- * Properties with the producer configuration.
- */
- public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
- this(topicId, serializationSchema, producerConfig, new FixedPartitioner<T>());
- }
-
- /**
- * Create Kafka producer
- *
- * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
- */
- public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
- // We create a Kafka 09 producer instance here and only "override" (by intercepting) the
- // invoke call.
- super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
- }
-
-
- // ----------------------------- Generic element processing ---------------------------
-
- private void invokeInternal(T next, long elementTimestamp) throws Exception {
-
- final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
-
- internalProducer.checkErroneous();
-
- byte[] serializedKey = internalProducer.schema.serializeKey(next);
- byte[] serializedValue = internalProducer.schema.serializeValue(next);
- String targetTopic = internalProducer.schema.getTargetTopic(next);
- if (targetTopic == null) {
- targetTopic = internalProducer.defaultTopicId;
- }
-
- Long timestamp = null;
- if(this.writeTimestampToKafka) {
- timestamp = elementTimestamp;
- }
-
- ProducerRecord<byte[], byte[]> record;
- if (internalProducer.partitioner == null) {
- record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
- } else {
- record = new ProducerRecord<>(targetTopic, internalProducer.partitioner.partition(next, serializedKey, serializedValue, internalProducer.partitions.length), timestamp, serializedKey, serializedValue);
- }
- if (internalProducer.flushOnCheckpoint) {
- synchronized (internalProducer.pendingRecordsLock) {
- internalProducer.pendingRecords++;
- }
- }
- internalProducer.producer.send(record, internalProducer.callback);
- }
-
-
- // ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ----
-
-
- // ---- Configuration setters
-
- /**
- * Defines whether the producer should fail on errors, or only log them.
- * If this is set to true, then exceptions will be only logged, if set to false,
- * exceptions will be eventually thrown and cause the streaming program to
- * fail (and enter recovery).
- *
- * Method is only accessible for approach (a) (see above)
- *
- * @param logFailuresOnly The flag to indicate logging-only on exceptions.
- */
- public void setLogFailuresOnly(boolean logFailuresOnly) {
- final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
- internalProducer.setLogFailuresOnly(logFailuresOnly);
- }
-
- /**
- * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
- * to be acknowledged by the Kafka producer on a checkpoint.
- * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
- *
- * Method is only accessible for approach (a) (see above)
- *
- * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
- */
- public void setFlushOnCheckpoint(boolean flush) {
- final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
- internalProducer.setFlushOnCheckpoint(flush);
- }
-
- /**
- * This method is used for approach (a) (see above)
- *
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
- internalProducer.open(parameters);
- }
-
- /**
- * This method is used for approach (a) (see above)
- */
- @Override
- public IterationRuntimeContext getIterationRuntimeContext() {
- final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
- return internalProducer.getIterationRuntimeContext();
- }
-
- /**
- * This method is used for approach (a) (see above)
- */
- @Override
- public void setRuntimeContext(RuntimeContext t) {
- final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
- internalProducer.setRuntimeContext(t);
- }
-
- /**
- * Invoke method for using the Sink as DataStream.addSink() sink.
- *
- * This method is used for approach (a) (see above)
- *
- * @param value The input record.
- */
- @Override
- public void invoke(T value) throws Exception {
- invokeInternal(value, Long.MAX_VALUE);
- }
-
-
- // ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ----
-
-
- /**
- * Process method for using the sink with timestamp support.
- *
- * This method is used for approach (b) (see above)
- */
- @Override
- public void processElement(StreamRecord<T> element) throws Exception {
- invokeInternal(element.getValue(), element.getTimestamp());
- }
-
- /**
- * Configuration object returned by the writeToKafkaWithTimestamps() call.
- */
- public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {
-
- private final FlinkKafkaProducerBase wrappedProducerBase;
- private final FlinkKafkaProducer010 producer;
-
- private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) {
- //noinspection unchecked
- super(stream, producer);
- this.producer = producer;
- this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction;
- }
-
- /**
- * Defines whether the producer should fail on errors, or only log them.
- * If this is set to true, then exceptions will be only logged, if set to false,
- * exceptions will be eventually thrown and cause the streaming program to
- * fail (and enter recovery).
- *
- * @param logFailuresOnly The flag to indicate logging-only on exceptions.
- */
- public void setLogFailuresOnly(boolean logFailuresOnly) {
- this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly);
- }
-
- /**
- * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
- * to be acknowledged by the Kafka producer on a checkpoint.
- * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
- *
- * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
- */
- public void setFlushOnCheckpoint(boolean flush) {
- this.wrappedProducerBase.setFlushOnCheckpoint(flush);
- }
-
- /**
- * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
- * Timestamps must be positive for Kafka to accept them.
- *
- * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
- */
- public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
- this.producer.writeTimestampToKafka = writeTimestampToKafka;
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
deleted file mode 100644
index ddf1ad3..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.10.
- */
-public class Kafka010JsonTableSource extends Kafka09JsonTableSource {
-
- /**
- * Creates a Kafka 0.10 JSON {@link StreamTableSource}.
- *
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param fieldNames Row field names.
- * @param fieldTypes Row field types.
- */
- public Kafka010JsonTableSource(
- String topic,
- Properties properties,
- String[] fieldNames,
- TypeInformation<?>[] fieldTypes) {
-
- super(topic, properties, fieldNames, fieldTypes);
- }
-
- /**
- * Creates a Kafka 0.10 JSON {@link StreamTableSource}.
- *
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param fieldNames Row field names.
- * @param fieldTypes Row field types.
- */
- public Kafka010JsonTableSource(
- String topic,
- Properties properties,
- String[] fieldNames,
- Class<?>[] fieldTypes) {
-
- super(topic, properties, fieldNames, fieldTypes);
- }
-
- @Override
- FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
- return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
deleted file mode 100644
index 732440b..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.10.
- */
-public class Kafka010TableSource extends Kafka09TableSource {
-
- /**
- * Creates a Kafka 0.10 {@link StreamTableSource}.
- *
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param deserializationSchema Deserialization schema to use for Kafka records.
- * @param fieldNames Row field names.
- * @param fieldTypes Row field types.
- */
- public Kafka010TableSource(
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- String[] fieldNames,
- TypeInformation<?>[] fieldTypes) {
-
- super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
- }
-
- /**
- * Creates a Kafka 0.10 {@link StreamTableSource}.
- *
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param deserializationSchema Deserialization schema to use for Kafka records.
- * @param fieldNames Row field names.
- * @param fieldTypes Row field types.
- */
- public Kafka010TableSource(
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- String[] fieldNames,
- Class<?>[] fieldTypes) {
-
- super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
- }
-
- @Override
- FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
- return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
deleted file mode 100644
index 71dd29a..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.SerializedValue;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.List;
-import java.util.Properties;
-
-/**
- * A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer API.
- *
- * <p>This fetcher re-uses basically all functionality of the 0.9 fetcher. It only additionally
- * takes the KafkaRecord-attached timestamp and attaches it to the Flink records.
- *
- * @param <T> The type of elements produced by the fetcher.
- */
-public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
-
- public Kafka010Fetcher(
- SourceContext<T> sourceContext,
- List<KafkaTopicPartition> assignedPartitions,
- SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
- SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- ProcessingTimeService processingTimeProvider,
- long autoWatermarkInterval,
- ClassLoader userCodeClassLoader,
- boolean enableCheckpointing,
- String taskNameWithSubtasks,
- MetricGroup metricGroup,
- KeyedDeserializationSchema<T> deserializer,
- Properties kafkaProperties,
- long pollTimeout,
- boolean useMetrics) throws Exception
- {
- super(
- sourceContext,
- assignedPartitions,
- watermarksPeriodic,
- watermarksPunctuated,
- processingTimeProvider,
- autoWatermarkInterval,
- userCodeClassLoader,
- enableCheckpointing,
- taskNameWithSubtasks,
- metricGroup,
- deserializer,
- kafkaProperties,
- pollTimeout,
- useMetrics);
- }
-
- @Override
- protected void emitRecord(
- T record,
- KafkaTopicPartitionState<TopicPartition> partition,
- long offset,
- ConsumerRecord<?, ?> consumerRecord) throws Exception {
-
- // we attach the Kafka 0.10 timestamp here
- emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp());
- }
-
- /**
- * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10,
- * changing binary signatures
- */
- @Override
- protected KafkaConsumerCallBridge010 createCallBridge() {
- return new KafkaConsumerCallBridge010();
- }
-
- @Override
- protected String getFetcherName() {
- return "Kafka 0.10 Fetcher";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
deleted file mode 100644
index a81b098..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.List;
-
-/**
- * The ConsumerCallBridge simply calls the {@link KafkaConsumer#assign(java.util.Collection)} method.
- *
- * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
- * changing {@code assign(List)} to {@code assign(Collection)}.
- *
- * Because of that, we need two versions whose compiled code goes against different method signatures.
- */
-public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
-
- @Override
- public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
- consumer.assign(topicPartitions);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
deleted file mode 100644
index 6bdfb48..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
deleted file mode 100644
index 6ee0429..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internal.Handover;
-import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
-import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.TopicPartition;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-/**
- * Unit tests for the {@link Kafka010Fetcher}.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaConsumerThread.class)
-public class Kafka010FetcherTest {
-
- @Test
- public void testCommitDoesNotBlock() throws Exception {
-
- // test data
- final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
- final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
- testCommitData.put(testPartition, 11L);
-
- // to synchronize when the consumer is in its blocking method
- final OneShotLatch sync = new OneShotLatch();
-
- // ----- the mock consumer with blocking poll calls ----
- final MultiShotLatch blockerLatch = new MultiShotLatch();
-
- KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
- when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-
- @Override
- public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
- sync.trigger();
- blockerLatch.await();
- return ConsumerRecords.empty();
- }
- });
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) {
- blockerLatch.trigger();
- return null;
- }
- }).when(mockConsumer).wakeup();
-
- // make sure the fetcher creates the mock consumer
- whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
- // ----- create the test fetcher -----
-
- @SuppressWarnings("unchecked")
- SourceContext<String> sourceContext = mock(SourceContext.class);
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
- KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
- final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
- sourceContext,
- topics,
- null, /* periodic assigner */
- null, /* punctuated assigner */
- new TestProcessingTimeService(),
- 10,
- getClass().getClassLoader(),
- false, /* checkpointing */
- "taskname-with-subtask",
- new UnregisteredMetricsGroup(),
- schema,
- new Properties(),
- 0L,
- false);
-
- // ----- run the fetcher -----
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final Thread fetcherRunner = new Thread("fetcher runner") {
-
- @Override
- public void run() {
- try {
- fetcher.runFetchLoop();
- } catch (Throwable t) {
- error.set(t);
- }
- }
- };
- fetcherRunner.start();
-
- // wait until the fetcher has reached the method of interest
- sync.await();
-
- // ----- trigger the offset commit -----
-
- final AtomicReference<Throwable> commitError = new AtomicReference<>();
- final Thread committer = new Thread("committer runner") {
- @Override
- public void run() {
- try {
- fetcher.commitInternalOffsetsToKafka(testCommitData);
- } catch (Throwable t) {
- commitError.set(t);
- }
- }
- };
- committer.start();
-
- // ----- ensure that the committer finishes in time -----
- committer.join(30000);
- assertFalse("The committer did not finish in time", committer.isAlive());
-
- // ----- test done, wait till the fetcher is done for a clean shutdown -----
- fetcher.cancel();
- fetcherRunner.join();
-
- // check that there were no errors in the fetcher
- final Throwable fetcherError = error.get();
- if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
- throw new Exception("Exception in the fetcher", fetcherError);
- }
- final Throwable committerError = commitError.get();
- if (committerError != null) {
- throw new Exception("Exception in the committer", committerError);
- }
- }
-
- @Test
- public void ensureOffsetsGetCommitted() throws Exception {
-
- // test data
- final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
- final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
-
- final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
- testCommitData1.put(testPartition1, 11L);
- testCommitData1.put(testPartition2, 18L);
-
- final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
- testCommitData2.put(testPartition1, 19L);
- testCommitData2.put(testPartition2, 28L);
-
- final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
-
-
- // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
-
- final MultiShotLatch blockerLatch = new MultiShotLatch();
-
- KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-
- when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
- @Override
- public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
- blockerLatch.await();
- return ConsumerRecords.empty();
- }
- });
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) {
- blockerLatch.trigger();
- return null;
- }
- }).when(mockConsumer).wakeup();
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) {
- @SuppressWarnings("unchecked")
- Map<TopicPartition, OffsetAndMetadata> offsets =
- (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
-
- OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
-
- commitStore.add(offsets);
- callback.onComplete(offsets, null);
-
- return null;
- }
- }).when(mockConsumer).commitAsync(
- Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
-
- // make sure the fetcher creates the mock consumer
- whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
- // ----- create the test fetcher -----
-
- @SuppressWarnings("unchecked")
- SourceContext<String> sourceContext = mock(SourceContext.class);
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
- KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
- StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
-
- final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
- sourceContext,
- topics,
- null, /* periodic assigner */
- null, /* punctuated assigner */
- new TestProcessingTimeService(),
- 10,
- getClass().getClassLoader(),
- false, /* checkpointing */
- "taskname-with-subtask",
- new UnregisteredMetricsGroup(),
- schema,
- new Properties(),
- 0L,
- false);
-
-
- // ----- run the fetcher -----
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final Thread fetcherRunner = new Thread("fetcher runner") {
-
- @Override
- public void run() {
- try {
- fetcher.runFetchLoop();
- } catch (Throwable t) {
- error.set(t);
- }
- }
- };
- fetcherRunner.start();
-
- // ----- trigger the first offset commit -----
-
- fetcher.commitInternalOffsetsToKafka(testCommitData1);
- Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
-
- for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
- TopicPartition partition = entry.getKey();
- if (partition.topic().equals("test")) {
- assertEquals(42, partition.partition());
- assertEquals(12L, entry.getValue().offset());
- }
- else if (partition.topic().equals("another")) {
- assertEquals(99, partition.partition());
- assertEquals(18L, entry.getValue().offset());
- }
- }
-
- // ----- trigger the second offset commit -----
-
- fetcher.commitInternalOffsetsToKafka(testCommitData2);
- Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
-
- for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
- TopicPartition partition = entry.getKey();
- if (partition.topic().equals("test")) {
- assertEquals(42, partition.partition());
- assertEquals(20L, entry.getValue().offset());
- }
- else if (partition.topic().equals("another")) {
- assertEquals(99, partition.partition());
- assertEquals(28L, entry.getValue().offset());
- }
- }
-
- // ----- test done, wait till the fetcher is done for a clean shutdown -----
- fetcher.cancel();
- fetcherRunner.join();
-
- // check that there were no errors in the fetcher
- final Throwable caughtError = error.get();
- if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
- throw new Exception("Exception in the fetcher", caughtError);
- }
- }
-
- @Test
- public void testCancellationWhenEmitBlocks() throws Exception {
-
- // ----- some test data -----
-
- final String topic = "test-topic";
- final int partition = 3;
- final byte[] payload = new byte[] {1, 2, 3, 4};
-
- final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
- new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
- new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
- new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
-
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
- data.put(new TopicPartition(topic, partition), records);
-
- final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
-
- // ----- the test consumer -----
-
- final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
- when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
- @Override
- public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
- return consumerRecords;
- }
- });
-
- whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
- // ----- build a fetcher -----
-
- BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
- KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
- final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
- sourceContext,
- topics,
- null, /* periodic watermark extractor */
- null, /* punctuated watermark extractor */
- new TestProcessingTimeService(),
- 10, /* watermark interval */
- this.getClass().getClassLoader(),
- true, /* checkpointing */
- "task_name",
- new UnregisteredMetricsGroup(),
- schema,
- new Properties(),
- 0L,
- false);
-
-
- // ----- run the fetcher -----
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final Thread fetcherRunner = new Thread("fetcher runner") {
-
- @Override
- public void run() {
- try {
- fetcher.runFetchLoop();
- } catch (Throwable t) {
- error.set(t);
- }
- }
- };
- fetcherRunner.start();
-
- // wait until the thread started to emit records to the source context
- sourceContext.waitTillHasBlocker();
-
- // now we try to cancel the fetcher, including the interruption usually done on the task thread
- // once it has finished, there must be no more thread blocked on the source context
- fetcher.cancel();
- fetcherRunner.interrupt();
- fetcherRunner.join();
-
- assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
- }
-
- // ------------------------------------------------------------------------
- // test utilities
- // ------------------------------------------------------------------------
-
- private static final class BlockingSourceContext<T> implements SourceContext<T> {
-
- private final ReentrantLock lock = new ReentrantLock();
- private final OneShotLatch inBlocking = new OneShotLatch();
-
- @Override
- public void collect(T element) {
- block();
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- block();
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- block();
- }
-
- @Override
- public Object getCheckpointLock() {
- return new Object();
- }
-
- @Override
- public void close() {}
-
- public void waitTillHasBlocker() throws InterruptedException {
- inBlocking.await();
- }
-
- public boolean isStillBlocking() {
- return lock.isLocked();
- }
-
- @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
- private void block() {
- lock.lock();
- try {
- inBlocking.trigger();
-
- // put this thread to sleep indefinitely
- final Object o = new Object();
- while (true) {
- synchronized (o) {
- o.wait();
- }
- }
- }
- catch (InterruptedException e) {
- // exit cleanly, simply reset the interruption flag
- Thread.currentThread().interrupt();
- }
- finally {
- lock.unlock();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
deleted file mode 100644
index 08511c9..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-
-
-public class Kafka010ITCase extends KafkaConsumerTestBase {
-
- // ------------------------------------------------------------------------
- // Suite of Tests
- // ------------------------------------------------------------------------
-
-
- @Test(timeout = 60000)
- public void testFailOnNoBroker() throws Exception {
- runFailOnNoBrokerTest();
- }
-
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
- runSimpleConcurrentProducerConsumerTopology();
- }
-
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
- runKeyValueTest();
- }
-
- // --- canceling / failures ---
-
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
- runCancelingOnEmptyInputTest();
- }
-
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
- runCancelingOnFullInputTest();
- }
-
- @Test(timeout = 60000)
- public void testFailOnDeploy() throws Exception {
- runFailOnDeployTest();
- }
-
-
- // --- source to partition mappings and exactly once ---
-
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
- runOneToOneExactlyOnceTest();
- }
-
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
- runOneSourceMultiplePartitionsExactlyOnceTest();
- }
-
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
- runMultipleSourcesOnePartitionExactlyOnceTest();
- }
-
- // --- broker failure ---
-
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
- runBrokerFailureTest();
- }
-
- // --- special executions ---
-
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
- runBigRecordTestTopology();
- }
-
- @Test(timeout = 60000)
- public void testMultipleTopics() throws Exception {
- runProduceConsumeMultipleTopics();
- }
-
- @Test(timeout = 60000)
- public void testAllDeletes() throws Exception {
- runAllDeletesTest();
- }
-
- @Test(timeout = 60000)
- public void testMetricsAndEndOfStream() throws Exception {
- runEndOfStreamTest();
- }
-
- // --- offset committing ---
-
- @Test(timeout = 60000)
- public void testCommitOffsetsToKafka() throws Exception {
- runCommitOffsetsToKafka();
- }
-
- @Test(timeout = 60000)
- public void testStartFromKafkaCommitOffsets() throws Exception {
- runStartFromKafkaCommitOffsets();
- }
-
- @Test(timeout = 60000)
- public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
- runAutoOffsetRetrievalAndCommitToKafka();
- }
-
- /**
- * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka
- */
- @Test(timeout = 60000)
- public void testTimestamps() throws Exception {
-
- final String topic = "tstopic";
- createTestTopic(topic, 3, 1);
-
- // ---------- Produce an event time stream into Kafka -------------------
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(1);
- env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
- env.getConfig().disableSysoutLogging();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() {
- boolean running = true;
-
- @Override
- public void run(SourceContext<Long> ctx) throws Exception {
- long i = 0;
- while(running) {
- ctx.collectWithTimestamp(i, i*2);
- if(i++ == 1000L) {
- running = false;
- }
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig());
- FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new KafkaPartitioner<Long>() {
- @Override
- public int partition(Long next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- return (int)(next % 3);
- }
- });
- prod.setParallelism(3);
- prod.setWriteTimestampToKafka(true);
- env.execute("Produce some");
-
- // ---------- Consume stream from Kafka -------------------
-
- env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(1);
- env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
- env.getConfig().disableSysoutLogging();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- FlinkKafkaConsumer010<Long> kafkaSource = new FlinkKafkaConsumer010<>(topic, new LimitedLongDeserializer(), standardProps);
- kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() {
- @Nullable
- @Override
- public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
- if(lastElement % 10 == 0) {
- return new Watermark(lastElement);
- }
- return null;
- }
-
- @Override
- public long extractTimestamp(Long element, long previousElementTimestamp) {
- return previousElementTimestamp;
- }
- });
-
- DataStream<Long> stream = env.addSource(kafkaSource);
- GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
- stream.transform("timestamp validating operator", objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1);
-
- env.execute("Consume again");
-
- deleteTestTopic(topic);
- }
-
- private static class TimestampValidatingOperator extends StreamSink<Long> {
-
- public TimestampValidatingOperator() {
- super(new SinkFunction<Long>() {
- @Override
- public void invoke(Long value) throws Exception {
- throw new RuntimeException("Unexpected");
- }
- });
- }
-
- long elCount = 0;
- long wmCount = 0;
- long lastWM = Long.MIN_VALUE;
-
- @Override
- public void processElement(StreamRecord<Long> element) throws Exception {
- elCount++;
- if(element.getValue() * 2 != element.getTimestamp()) {
- throw new RuntimeException("Invalid timestamp: " + element);
- }
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- wmCount++;
-
- if(lastWM <= mark.getTimestamp()) {
- lastWM = mark.getTimestamp();
- } else {
- throw new RuntimeException("Received watermark higher than the last one");
- }
-
- if( mark.getTimestamp() % 10 != 0 && mark.getTimestamp() != Long.MAX_VALUE ) {
- throw new RuntimeException("Invalid watermark: " + mark.getTimestamp());
- }
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if(elCount != 1000L) {
- throw new RuntimeException("Wrong final element count " + elCount);
- }
-
- if(wmCount <= 2) {
- throw new RuntimeException("Almost no watermarks have been sent " + wmCount);
- }
- }
- }
-
- private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> {
-
- private final TypeInformation<Long> ti;
- private final TypeSerializer<Long> ser;
- long cnt = 0;
-
- public LimitedLongDeserializer() {
- this.ti = TypeInfoParser.parse("Long");
- this.ser = ti.createSerializer(new ExecutionConfig());
- }
- @Override
- public TypeInformation<Long> getProducedType() {
- return ti;
- }
-
- @Override
- public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
- cnt++;
- DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
- Long e = ser.deserialize(in);
- return e;
- }
-
- @Override
- public boolean isEndOfStream(Long nextElement) {
- return cnt > 1000L;
- }
- }
-
-}