You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:00 UTC
[10/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
deleted file mode 100644
index c28799c..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase {
-
- @Test(timeout=60000)
- public void testAutoOffsetReset() throws Exception {
- runAutoOffsetResetTest();
- }
-
- @Test(timeout=60000)
- public void testAutoOffsetResetNone() throws Exception {
- runFailOnAutoOffsetResetNoneEager();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
deleted file mode 100644
index 6235449..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.api.PartitionMetadata;
-import kafka.common.KafkaException;
-import kafka.network.SocketServer;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
-import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.Seq;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.BindException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * An implementation of the KafkaServerProvider for Kafka 0.8
- */
-public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
-
- protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
- private File tmpZkDir;
- private File tmpKafkaParent;
- private List<File> tmpKafkaDirs;
- private List<KafkaServer> brokers;
- private TestingServer zookeeper;
- private String zookeeperConnectionString;
- private String brokerConnectionString = "";
- private Properties standardProps;
- private Properties additionalServerProperties;
-
- public String getBrokerConnectionString() {
- return brokerConnectionString;
- }
-
- @Override
- public Properties getStandardProperties() {
- return standardProps;
- }
-
- @Override
- public Properties getSecureProperties() {
- return null;
- }
-
- @Override
- public String getVersion() {
- return "0.8";
- }
-
- @Override
- public List<KafkaServer> getBrokers() {
- return brokers;
- }
-
- @Override
- public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
- return new FlinkKafkaConsumer08<>(topics, readSchema, props);
- }
-
- @Override
- public <T> StreamSink<T> getProducerSink(
- String topic,
- KeyedSerializationSchema<T> serSchema,
- Properties props,
- KafkaPartitioner<T> partitioner) {
- FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(
- topic,
- serSchema,
- props,
- partitioner);
- prod.setFlushOnCheckpoint(true);
- return new StreamSink<>(prod);
- }
-
- @Override
- public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
- FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner);
- prod.setFlushOnCheckpoint(true);
- return stream.addSink(prod);
- }
-
- @Override
- public KafkaOffsetHandler createOffsetHandler(Properties props) {
- return new KafkaOffsetHandlerImpl(props);
- }
-
- @Override
- public void restartBroker(int leaderId) throws Exception {
- brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
- }
-
- @Override
- public int getLeaderToShutDown(String topic) throws Exception {
- ZkClient zkClient = createZkClient();
- PartitionMetadata firstPart = null;
- do {
- if (firstPart != null) {
- LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
- // not the first try. Sleep a bit
- Thread.sleep(150);
- }
-
- Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
- firstPart = partitionMetadata.head();
- }
- while (firstPart.errorCode() != 0);
- zkClient.close();
-
- return firstPart.leader().get().id();
- }
-
- @Override
- public int getBrokerId(KafkaServer server) {
- return server.socketServer().brokerId();
- }
-
- @Override
- public boolean isSecureRunSupported() {
- return false;
- }
-
-
- @Override
- public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
- this.additionalServerProperties = additionalServerProperties;
- File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
- tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
- try {
- Files.createDirectories(tmpZkDir.toPath());
- } catch (IOException e) {
- fail("cannot create zookeeper temp dir: " + e.getMessage());
- }
-
- tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir" + (UUID.randomUUID().toString()));
- try {
- Files.createDirectories(tmpKafkaParent.toPath());
- } catch (IOException e) {
- fail("cannot create kafka temp dir: " + e.getMessage());
- }
-
- tmpKafkaDirs = new ArrayList<>(numKafkaServers);
- for (int i = 0; i < numKafkaServers; i++) {
- File tmpDir = new File(tmpKafkaParent, "server-" + i);
- assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
- tmpKafkaDirs.add(tmpDir);
- }
-
- zookeeper = null;
- brokers = null;
-
- try {
- LOG.info("Starting Zookeeper");
- zookeeper = new TestingServer(-1, tmpZkDir);
- zookeeperConnectionString = zookeeper.getConnectString();
-
- LOG.info("Starting KafkaServer");
- brokers = new ArrayList<>(numKafkaServers);
-
- for (int i = 0; i < numKafkaServers; i++) {
- brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
- SocketServer socketServer = brokers.get(i).socketServer();
-
- String host = socketServer.host() == null ? "localhost" : socketServer.host();
- brokerConnectionString += hostAndPortToUrlString(host, socketServer.port()) + ",";
- }
-
- LOG.info("ZK and KafkaServer started.");
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Test setup failed: " + t.getMessage());
- }
-
- standardProps = new Properties();
- standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
- standardProps.setProperty("bootstrap.servers", brokerConnectionString);
- standardProps.setProperty("group.id", "flink-tests");
- standardProps.setProperty("auto.commit.enable", "false");
- standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis.
- standardProps.setProperty("zookeeper.connection.timeout.ms", "30000");
- standardProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning. (smallest is kafka 0.8)
- standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
- }
-
- @Override
- public void shutdown() {
- if (brokers != null) {
- for (KafkaServer broker : brokers) {
- if (broker != null) {
- broker.shutdown();
- }
- }
- brokers.clear();
- }
-
- if (zookeeper != null) {
- try {
- zookeeper.stop();
- zookeeper.close();
- }
- catch (Exception e) {
- LOG.warn("ZK.stop() failed", e);
- }
- zookeeper = null;
- }
-
- // clean up the temp spaces
-
- if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
- try {
- FileUtils.deleteDirectory(tmpKafkaParent);
- }
- catch (Exception e) {
- // ignore
- }
- }
- if (tmpZkDir != null && tmpZkDir.exists()) {
- try {
- FileUtils.deleteDirectory(tmpZkDir);
- }
- catch (Exception e) {
- // ignore
- }
- }
- }
-
- @Override
- public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
- // create topic with one client
- LOG.info("Creating topic {}", topic);
-
- ZkClient creator = createZkClient();
-
- AdminUtils.createTopic(creator, topic, numberOfPartitions, replicationFactor, topicConfig);
- creator.close();
-
- // validate that the topic has been created
- final long deadline = System.currentTimeMillis() + 30000;
- do {
- try {
- Thread.sleep(100);
- }
- catch (InterruptedException e) {
- // restore interrupted state
- }
- List<KafkaTopicPartitionLeader> partitions = FlinkKafkaConsumer08.getPartitionsForTopic(Collections.singletonList(topic), standardProps);
- if (partitions != null && partitions.size() > 0) {
- return;
- }
- }
- while (System.currentTimeMillis() < deadline);
- fail ("Test topic could not be created");
- }
-
- @Override
- public void deleteTestTopic(String topic) {
- LOG.info("Deleting topic {}", topic);
-
- ZkClient zk = createZkClient();
- AdminUtils.deleteTopic(zk, topic);
- zk.close();
- }
-
- private ZkClient createZkClient() {
- return new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
- Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
- }
-
- /**
- * Only for the 0.8 server we need access to the zk client.
- */
- public CuratorFramework createCuratorClient() {
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
- CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy);
- curatorClient.start();
- return curatorClient;
- }
-
- /**
- * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
- */
- protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
- LOG.info("Starting broker with id {}", brokerId);
- Properties kafkaProperties = new Properties();
-
- // properties have to be Strings
- kafkaProperties.put("advertised.host.name", KAFKA_HOST);
- kafkaProperties.put("broker.id", Integer.toString(brokerId));
- kafkaProperties.put("log.dir", tmpFolder.toString());
- kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
- kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
- kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
-
- // for CI stability, increase zookeeper session timeout
- kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
- kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
- if(additionalServerProperties != null) {
- kafkaProperties.putAll(additionalServerProperties);
- }
-
- final int numTries = 5;
-
- for (int i = 1; i <= numTries; i++) {
- int kafkaPort = NetUtils.getAvailablePort();
- kafkaProperties.put("port", Integer.toString(kafkaPort));
- KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-
- try {
- KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
- server.startup();
- return server;
- }
- catch (KafkaException e) {
- if (e.getCause() instanceof BindException) {
- // port conflict, retry...
- LOG.info("Port conflict when starting Kafka Broker. Retrying...");
- }
- else {
- throw e;
- }
- }
- }
-
- throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
- }
-
- private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
-
- private final CuratorFramework offsetClient;
- private final String groupId;
-
- public KafkaOffsetHandlerImpl(Properties props) {
- offsetClient = createCuratorClient();
- groupId = props.getProperty("group.id");
- }
-
- @Override
- public Long getCommittedOffset(String topicName, int partition) {
- try {
- return ZookeeperOffsetHandler.getOffsetFromZooKeeper(offsetClient, groupId, topicName, partition);
- } catch (Exception e) {
- throw new RuntimeException("Exception when getting offsets from Zookeeper", e);
- }
- }
-
- @Override
- public void close() {
- offsetClient.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
deleted file mode 100644
index 6298c92..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
+++ /dev/null
@@ -1,603 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.*;
-import static java.util.Arrays.asList;
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-
-public class ClosableBlockingQueueTest {
-
- // ------------------------------------------------------------------------
- // single-threaded unit tests
- // ------------------------------------------------------------------------
-
- @Test
- public void testCreateQueueHashCodeEquals() {
- try {
- ClosableBlockingQueue<String> queue1 = new ClosableBlockingQueue<>();
- ClosableBlockingQueue<String> queue2 = new ClosableBlockingQueue<>(22);
-
- assertTrue(queue1.isOpen());
- assertTrue(queue2.isOpen());
- assertTrue(queue1.isEmpty());
- assertTrue(queue2.isEmpty());
- assertEquals(0, queue1.size());
- assertEquals(0, queue2.size());
-
- assertTrue(queue1.hashCode() == queue2.hashCode());
- //noinspection EqualsWithItself
- assertTrue(queue1.equals(queue1));
- //noinspection EqualsWithItself
- assertTrue(queue2.equals(queue2));
- assertTrue(queue1.equals(queue2));
-
- assertNotNull(queue1.toString());
- assertNotNull(queue2.toString());
-
- List<String> elements = new ArrayList<>();
- elements.add("a");
- elements.add("b");
- elements.add("c");
-
- ClosableBlockingQueue<String> queue3 = new ClosableBlockingQueue<>(elements);
- ClosableBlockingQueue<String> queue4 = new ClosableBlockingQueue<>(asList("a", "b", "c"));
-
- assertTrue(queue3.isOpen());
- assertTrue(queue4.isOpen());
- assertFalse(queue3.isEmpty());
- assertFalse(queue4.isEmpty());
- assertEquals(3, queue3.size());
- assertEquals(3, queue4.size());
-
- assertTrue(queue3.hashCode() == queue4.hashCode());
- //noinspection EqualsWithItself
- assertTrue(queue3.equals(queue3));
- //noinspection EqualsWithItself
- assertTrue(queue4.equals(queue4));
- assertTrue(queue3.equals(queue4));
-
- assertNotNull(queue3.toString());
- assertNotNull(queue4.toString());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testCloseEmptyQueue() {
- try {
- ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
- assertTrue(queue.isOpen());
- assertTrue(queue.close());
- assertFalse(queue.isOpen());
-
- assertFalse(queue.addIfOpen("element"));
- assertTrue(queue.isEmpty());
-
- try {
- queue.add("some element");
- fail("should cause an exception");
- } catch (IllegalStateException ignored) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testCloseNonEmptyQueue() {
- try {
- ClosableBlockingQueue<Integer> queue = new ClosableBlockingQueue<>(asList(1, 2, 3));
- assertTrue(queue.isOpen());
-
- assertFalse(queue.close());
- assertFalse(queue.close());
-
- queue.poll();
-
- assertFalse(queue.close());
- assertFalse(queue.close());
-
- queue.pollBatch();
-
- assertTrue(queue.close());
- assertFalse(queue.isOpen());
-
- assertFalse(queue.addIfOpen(42));
- assertTrue(queue.isEmpty());
-
- try {
- queue.add(99);
- fail("should cause an exception");
- } catch (IllegalStateException ignored) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPeekAndPoll() {
- try {
- ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
-
- assertNull(queue.peek());
- assertNull(queue.peek());
- assertNull(queue.poll());
- assertNull(queue.poll());
-
- assertEquals(0, queue.size());
-
- queue.add("a");
- queue.add("b");
- queue.add("c");
-
- assertEquals(3, queue.size());
-
- assertEquals("a", queue.peek());
- assertEquals("a", queue.peek());
- assertEquals("a", queue.peek());
-
- assertEquals(3, queue.size());
-
- assertEquals("a", queue.poll());
- assertEquals("b", queue.poll());
-
- assertEquals(1, queue.size());
-
- assertEquals("c", queue.peek());
- assertEquals("c", queue.peek());
-
- assertEquals("c", queue.poll());
-
- assertEquals(0, queue.size());
- assertNull(queue.poll());
- assertNull(queue.peek());
- assertNull(queue.peek());
-
- assertTrue(queue.close());
-
- try {
- queue.peek();
- fail("should cause an exception");
- } catch (IllegalStateException ignored) {
- // expected
- }
-
- try {
- queue.poll();
- fail("should cause an exception");
- } catch (IllegalStateException ignored) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPollBatch() {
- try {
- ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
-
- assertNull(queue.pollBatch());
-
- queue.add("a");
- queue.add("b");
-
- assertEquals(asList("a", "b"), queue.pollBatch());
- assertNull(queue.pollBatch());
-
- queue.add("c");
-
- assertEquals(singletonList("c"), queue.pollBatch());
- assertNull(queue.pollBatch());
-
- assertTrue(queue.close());
-
- try {
- queue.pollBatch();
- fail("should cause an exception");
- } catch (IllegalStateException ignored) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testGetElementBlocking() {
- try {
- ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
-
- assertNull(queue.getElementBlocking(1));
- assertNull(queue.getElementBlocking(3));
- assertNull(queue.getElementBlocking(2));
-
- assertEquals(0, queue.size());
-
- queue.add("a");
- queue.add("b");
- queue.add("c");
- queue.add("d");
- queue.add("e");
- queue.add("f");
-
- assertEquals(6, queue.size());
-
- assertEquals("a", queue.getElementBlocking(99));
- assertEquals("b", queue.getElementBlocking());
-
- assertEquals(4, queue.size());
-
- assertEquals("c", queue.getElementBlocking(0));
- assertEquals("d", queue.getElementBlocking(1000000));
- assertEquals("e", queue.getElementBlocking());
- assertEquals("f", queue.getElementBlocking(1786598));
-
- assertEquals(0, queue.size());
-
- assertNull(queue.getElementBlocking(1));
- assertNull(queue.getElementBlocking(3));
- assertNull(queue.getElementBlocking(2));
-
- assertTrue(queue.close());
-
- try {
- queue.getElementBlocking();
- fail("should cause an exception");
- } catch (IllegalStateException ignored) {
- // expected
- }
-
- try {
- queue.getElementBlocking(1000000000L);
- fail("should cause an exception");
- } catch (IllegalStateException ignored) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testGetBatchBlocking() {
- try {
- ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
-
- assertEquals(emptyList(), queue.getBatchBlocking(1));
- assertEquals(emptyList(), queue.getBatchBlocking(3));
- assertEquals(emptyList(), queue.getBatchBlocking(2));
-
- queue.add("a");
- queue.add("b");
-
- assertEquals(asList("a", "b"), queue.getBatchBlocking(900000009));
-
- queue.add("c");
- queue.add("d");
-
- assertEquals(asList("c", "d"), queue.getBatchBlocking());
-
- assertEquals(emptyList(), queue.getBatchBlocking(2));
-
- queue.add("e");
-
- assertEquals(singletonList("e"), queue.getBatchBlocking(0));
-
- queue.add("f");
-
- assertEquals(singletonList("f"), queue.getBatchBlocking(1000000000));
-
- assertEquals(0, queue.size());
-
- assertEquals(emptyList(), queue.getBatchBlocking(1));
- assertEquals(emptyList(), queue.getBatchBlocking(3));
- assertEquals(emptyList(), queue.getBatchBlocking(2));
-
- assertTrue(queue.close());
-
- try {
- queue.getBatchBlocking();
- fail("should cause an exception");
- } catch (IllegalStateException ignored) {
- // expected
- }
-
- try {
- queue.getBatchBlocking(1000000000L);
- fail("should cause an exception");
- } catch (IllegalStateException ignored) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // ------------------------------------------------------------------------
- // multi-threaded tests
- // ------------------------------------------------------------------------
-
- @Test
- public void notifyOnClose() {
- try {
- final long oneYear = 365L * 24 * 60 * 60 * 1000;
-
- // test "getBatchBlocking()"
- final ClosableBlockingQueue<String> queue1 = new ClosableBlockingQueue<>();
- QueueCall call1 = new QueueCall() {
- @Override
- public void call() throws Exception {
- queue1.getBatchBlocking();
- }
- };
- testCallExitsOnClose(call1, queue1);
-
- // test "getBatchBlocking()"
- final ClosableBlockingQueue<String> queue2 = new ClosableBlockingQueue<>();
- QueueCall call2 = new QueueCall() {
- @Override
- public void call() throws Exception {
- queue2.getBatchBlocking(oneYear);
- }
- };
- testCallExitsOnClose(call2, queue2);
-
- // test "getBatchBlocking()"
- final ClosableBlockingQueue<String> queue3 = new ClosableBlockingQueue<>();
- QueueCall call3 = new QueueCall() {
- @Override
- public void call() throws Exception {
- queue3.getElementBlocking();
- }
- };
- testCallExitsOnClose(call3, queue3);
-
- // test "getBatchBlocking()"
- final ClosableBlockingQueue<String> queue4 = new ClosableBlockingQueue<>();
- QueueCall call4 = new QueueCall() {
- @Override
- public void call() throws Exception {
- queue4.getElementBlocking(oneYear);
- }
- };
- testCallExitsOnClose(call4, queue4);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- @Test
- public void testMultiThreadedAddGet() {
- try {
- final ClosableBlockingQueue<Integer> queue = new ClosableBlockingQueue<>();
- final AtomicReference<Throwable> pushErrorRef = new AtomicReference<>();
- final AtomicReference<Throwable> pollErrorRef = new AtomicReference<>();
-
- final int numElements = 2000;
-
- Thread pusher = new Thread("pusher") {
-
- @Override
- public void run() {
- try {
- final Random rnd = new Random();
- for (int i = 0; i < numElements; i++) {
- queue.add(i);
-
- // sleep a bit, sometimes
- int sleepTime = rnd.nextInt(3);
- if (sleepTime > 1) {
- Thread.sleep(sleepTime);
- }
- }
-
- while (true) {
- if (queue.close()) {
- break;
- } else {
- Thread.sleep(5);
- }
- }
- } catch (Throwable t) {
- pushErrorRef.set(t);
- }
- }
- };
- pusher.start();
-
- Thread poller = new Thread("poller") {
-
- @SuppressWarnings("InfiniteLoopStatement")
- @Override
- public void run() {
- try {
- int count = 0;
-
- try {
- final Random rnd = new Random();
- int nextExpected = 0;
-
- while (true) {
- int getMethod = count % 7;
- switch (getMethod) {
- case 0: {
- Integer next = queue.getElementBlocking(1);
- if (next != null) {
- assertEquals(nextExpected, next.intValue());
- nextExpected++;
- count++;
- }
- break;
- }
- case 1: {
- List<Integer> nextList = queue.getBatchBlocking();
- for (Integer next : nextList) {
- assertNotNull(next);
- assertEquals(nextExpected, next.intValue());
- nextExpected++;
- count++;
- }
- break;
- }
- case 2: {
- List<Integer> nextList = queue.getBatchBlocking(1);
- if (nextList != null) {
- for (Integer next : nextList) {
- assertNotNull(next);
- assertEquals(nextExpected, next.intValue());
- nextExpected++;
- count++;
- }
- }
- break;
- }
- case 3: {
- Integer next = queue.poll();
- if (next != null) {
- assertEquals(nextExpected, next.intValue());
- nextExpected++;
- count++;
- }
- break;
- }
- case 4: {
- List<Integer> nextList = queue.pollBatch();
- if (nextList != null) {
- for (Integer next : nextList) {
- assertNotNull(next);
- assertEquals(nextExpected, next.intValue());
- nextExpected++;
- count++;
- }
- }
- break;
- }
- default: {
- Integer next = queue.getElementBlocking();
- assertNotNull(next);
- assertEquals(nextExpected, next.intValue());
- nextExpected++;
- count++;
- }
- }
-
- // sleep a bit, sometimes
- int sleepTime = rnd.nextInt(3);
- if (sleepTime > 1) {
- Thread.sleep(sleepTime);
- }
- }
- } catch (IllegalStateException e) {
- // we get this once the queue is closed
- assertEquals(numElements, count);
- }
- } catch (Throwable t) {
- pollErrorRef.set(t);
- }
- }
- };
- poller.start();
-
- pusher.join();
- poller.join();
-
- if (pushErrorRef.get() != null) {
- Throwable t = pushErrorRef.get();
- t.printStackTrace();
- fail("Error in pusher: " + t.getMessage());
- }
- if (pollErrorRef.get() != null) {
- Throwable t = pollErrorRef.get();
- t.printStackTrace();
- fail("Error in poller: " + t.getMessage());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // ------------------------------------------------------------------------
- // Utils
- // ------------------------------------------------------------------------
-
- private static void testCallExitsOnClose(
- final QueueCall call, ClosableBlockingQueue<String> queue) throws Exception {
-
- final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- try {
- call.call();
- } catch (Throwable t) {
- errorRef.set(t);
- }
- }
- };
-
- Thread thread = new Thread(runnable);
- thread.start();
- Thread.sleep(100);
- queue.close();
- thread.join();
-
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- Throwable cause = errorRef.get();
- assertTrue(cause instanceof IllegalStateException);
- }
-
- private interface QueueCall {
- void call() throws Exception;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
deleted file mode 100644
index fbeb110..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,30 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-log4j.logger.org.apache.zookeeper=OFF, testlogger
-log4j.logger.state.change.logger=OFF, testlogger
-log4j.logger.kafka=OFF, testlogger
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
- <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
deleted file mode 100644
index f638c7a..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
+++ /dev/null
@@ -1,212 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-connectors</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
- <name>flink-connector-kafka-0.9</name>
-
- <packaging>jar</packaging>
-
- <!-- Allow users to pass custom connector versions -->
- <properties>
- <kafka.version>0.9.0.1</kafka.version>
- </properties>
-
- <dependencies>
-
- <!-- core dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-base_2.10</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table_2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- <!-- Projects depending on this project,
- won't depend on flink-table. -->
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${kafka.version}</version>
- </dependency>
-
- <!-- test dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-base_2.10</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <!-- exclude 0.8 dependencies -->
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- </exclusion>
- </exclusions>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <!-- include 0.9 server for tests -->
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- <version>${kafka.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-metrics-jmx</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests_2.10</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_2.10</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minikdc</artifactId>
- <version>${minikdc.version}</version>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- <configuration>
- <includes>
- <include>**/KafkaTestEnvironmentImpl*</include>
- </includes>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <executions>
- <execution>
- <id>attach-test-sources</id>
- <goals>
- <goal>test-jar-no-fork</goal>
- </goals>
- <configuration>
- <includes>
- <include>**/KafkaTestEnvironmentImpl*</include>
- </includes>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
- <forkCount>1</forkCount>
- <argLine>-Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
- </configuration>
- </plugin>
- <!--
- https://issues.apache.org/jira/browse/DIRSHARED-134
- Required to pull the Mini-KDC transitive dependency
- -->
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <version>3.0.1</version>
- <inherited>true</inherited>
- <extensions>true</extensions>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
deleted file mode 100644
index 29bb8e4..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.util.SerializedValue;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
- * Apache Kafka 0.9.x. The consumer can run in multiple parallel instances, each of which will pull
- * data from one or more Kafka partitions.
- *
- * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
- * during a failure, and that the computation processes elements "exactly once".
- * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
- *
- * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
- * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view
- * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer
- * has consumed a topic.</p>
- *
- * <p>Please refer to Kafka's documentation for the available configuration properties:
- * http://kafka.apache.org/documentation.html#newconsumerconfigs</p>
- *
- * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
- * is constructed. That means that the client that submits the program needs to be able to
- * reach the Kafka brokers or ZooKeeper.</p>
- */
-public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
-
- private static final long serialVersionUID = 2324564345203409112L;
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer09.class);
-
- /** Configuration key to change the polling timeout **/
- public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
-
-
- /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
- * available. If 0, returns immediately with any records that are available now. */
- public static final long DEFAULT_POLL_TIMEOUT = 100L;
-
- // ------------------------------------------------------------------------
-
- /** User-supplied properties for Kafka **/
- protected final Properties properties;
-
- /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
- * available. If 0, returns immediately with any records that are available now */
- protected final long pollTimeout;
-
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new Kafka streaming source consumer for Kafka 0.9.x
- *
- * @param topic
- * The name of the topic that should be consumed.
- * @param valueDeserializer
- * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
- * @param props
- * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
- */
- public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
- this(Collections.singletonList(topic), valueDeserializer, props);
- }
-
- /**
- * Creates a new Kafka streaming source consumer for Kafka 0.9.x
- *
- * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
- * pairs, offsets, and topic names from Kafka.
- *
- * @param topic
- * The name of the topic that should be consumed.
- * @param deserializer
- * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
- * @param props
- * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
- */
- public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
- this(Collections.singletonList(topic), deserializer, props);
- }
-
- /**
- * Creates a new Kafka streaming source consumer for Kafka 0.9.x
- *
- * This constructor allows passing multiple topics to the consumer.
- *
- * @param topics
- * The Kafka topics to read from.
- * @param deserializer
- * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
- * @param props
- * The properties that are used to configure both the fetcher and the offset handler.
- */
- public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
- this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props);
- }
-
- /**
- * Creates a new Kafka streaming source consumer for Kafka 0.9.x
- *
- * This constructor allows passing multiple topics and a key/value deserialization schema.
- *
- * @param topics
- * The Kafka topics to read from.
- * @param deserializer
- * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
- * @param props
- * The properties that are used to configure both the fetcher and the offset handler.
- */
- public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
- super(topics, deserializer);
-
- this.properties = checkNotNull(props, "props");
- setDeserializer(this.properties);
-
- // configure the polling timeout
- try {
- if (properties.containsKey(KEY_POLL_TIMEOUT)) {
- this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
- } else {
- this.pollTimeout = DEFAULT_POLL_TIMEOUT;
- }
- }
- catch (Exception e) {
- throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
- }
- }
-
- @Override
- protected AbstractFetcher<T, ?> createFetcher(
- SourceContext<T> sourceContext,
- List<KafkaTopicPartition> thisSubtaskPartitions,
- SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
- SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- StreamingRuntimeContext runtimeContext) throws Exception {
-
- boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
-
- return new Kafka09Fetcher<>(
- sourceContext,
- thisSubtaskPartitions,
- watermarksPeriodic,
- watermarksPunctuated,
- runtimeContext.getProcessingTimeService(),
- runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
- runtimeContext.getUserCodeClassLoader(),
- runtimeContext.isCheckpointingEnabled(),
- runtimeContext.getTaskNameWithSubtasks(),
- runtimeContext.getMetricGroup(),
- deserializer,
- properties,
- pollTimeout,
- useMetrics);
-
- }
-
- @Override
- protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
- // read the partitions that belong to the listed topics
- final List<KafkaTopicPartition> partitions = new ArrayList<>();
-
- try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(this.properties)) {
- for (final String topic: topics) {
- // get partitions for each topic
- List<PartitionInfo> partitionsForTopic = consumer.partitionsFor(topic);
- // for non existing topics, the list might be null.
- if (partitionsForTopic != null) {
- partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
- }
- }
- }
-
- if (partitions.isEmpty()) {
- throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics);
- }
-
- // we now have a list of partitions which is the same for all parallel consumer instances.
- LOG.info("Got {} partitions from these topics: {}", partitions.size(), topics);
-
- if (LOG.isInfoEnabled()) {
- logPartitionInfo(LOG, partitions);
- }
-
- return partitions;
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- /**
- * Converts a list of Kafka PartitionInfo's to Flink's KafkaTopicPartition (which are serializable)
- *
- * @param partitions A list of Kafka PartitionInfos.
- * @return A list of KafkaTopicPartitions
- */
- private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
- checkNotNull(partitions);
-
- List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size());
- for (PartitionInfo pi : partitions) {
- ret.add(new KafkaTopicPartition(pi.topic(), pi.partition()));
- }
- return ret;
- }
-
- /**
- * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties.
- *
- * @param props The Kafka properties to register the serializer in.
- */
- private static void setDeserializer(Properties props) {
- final String deSerName = ByteArrayDeserializer.class.getCanonicalName();
-
- Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
- Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-
- if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
- LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
- }
- if (valDeSer != null && !valDeSer.equals(deSerName)) {
- LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
- }
-
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
deleted file mode 100644
index 2a3e39d..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-
-/**
- * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.9.
- *
- * Please note that this producer does not have any reliability guarantees.
- *
- * @param <IN> Type of the messages to write into Kafka.
- */
-public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
-
- private static final long serialVersionUID = 1L;
-
- // ------------------- Keyless serialization schema constructors ----------------------
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
- * the topic.
- *
- * @param brokerList
- * Comma separated addresses of the brokers
- * @param topicId
- * ID of the Kafka topic.
- * @param serializationSchema
- * User defined (keyless) serialization schema.
- */
- public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
- * the topic.
- *
- * @param topicId
- * ID of the Kafka topic.
- * @param serializationSchema
- * User defined (keyless) serialization schema.
- * @param producerConfig
- * Properties with the producer configuration.
- */
- public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
- * the topic.
- *
- * @param topicId The topic to write data to
- * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
- */
- public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
-
- }
-
- // ------------------- Key/Value serialization schema constructors ----------------------
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
- * the topic.
- *
- * @param brokerList
- * Comma separated addresses of the brokers
- * @param topicId
- * ID of the Kafka topic.
- * @param serializationSchema
- * User defined serialization schema supporting key/value messages
- */
- public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
- this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
- * the topic.
- *
- * @param topicId
- * ID of the Kafka topic.
- * @param serializationSchema
- * User defined serialization schema supporting key/value messages
- * @param producerConfig
- * Properties with the producer configuration.
- */
- public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
- this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>());
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
- * the topic.
- *
- * @param topicId The topic to write data to
- * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
- */
- public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
- super(topicId, serializationSchema, producerConfig, customPartitioner);
- }
-
- @Override
- protected void flush() {
- if (this.producer != null) {
- producer.flush();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
deleted file mode 100644
index 38ea47c..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-/**
- * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
- */
-public class Kafka09JsonTableSink extends KafkaJsonTableSink {
- /**
- * Creates {@link KafkaTableSink} for Kafka 0.9
- *
- * @param topic topic in Kafka to which table is written
- * @param properties properties to connect to Kafka
- * @param partitioner Kafka partitioner
- */
- public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
- super(topic, properties, partitioner);
- }
-
- @Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
- return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
- }
-
- @Override
- protected Kafka09JsonTableSink createCopy() {
- return new Kafka09JsonTableSink(topic, properties, partitioner);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
deleted file mode 100644
index 975ef58..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.9.
- */
-public class Kafka09JsonTableSource extends KafkaJsonTableSource {
-
- /**
- * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
- *
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param fieldNames Row field names.
- * @param fieldTypes Row field types.
- */
- public Kafka09JsonTableSource(
- String topic,
- Properties properties,
- String[] fieldNames,
- TypeInformation<?>[] fieldTypes) {
-
- super(topic, properties, fieldNames, fieldTypes);
- }
-
- /**
- * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
- *
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param fieldNames Row field names.
- * @param fieldTypes Row field types.
- */
- public Kafka09JsonTableSource(
- String topic,
- Properties properties,
- String[] fieldNames,
- Class<?>[] fieldTypes) {
-
- super(topic, properties, fieldNames, fieldTypes);
- }
-
- @Override
- FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
- return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
deleted file mode 100644
index 03b5040..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.9.
- */
-public class Kafka09TableSource extends KafkaTableSource {
-
- /**
- * Creates a Kafka 0.9 {@link StreamTableSource}.
- *
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param deserializationSchema Deserialization schema to use for Kafka records.
- * @param fieldNames Row field names.
- * @param fieldTypes Row field types.
- */
- public Kafka09TableSource(
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- String[] fieldNames,
- TypeInformation<?>[] fieldTypes) {
-
- super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
- }
-
- /**
- * Creates a Kafka 0.9 {@link StreamTableSource}.
- *
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param deserializationSchema Deserialization schema to use for Kafka records.
- * @param fieldNames Row field names.
- * @param fieldTypes Row field types.
- */
- public Kafka09TableSource(
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- String[] fieldNames,
- Class<?>[] fieldTypes) {
-
- super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
- }
-
- @Override
- FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
- return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
deleted file mode 100644
index e6e3c51..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.ThreadSafe;
-import java.io.Closeable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The Handover is a utility to hand over data (a buffer of records) and exception from a
- * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves like a
- * "size one blocking queue", with some extras around exception reporting, closing, and
- * waking up thread without {@link Thread#interrupt() interrupting} threads.
- *
- * <p>This class is used in the Flink Kafka Consumer to hand over data and exceptions between
- * the thread that runs the KafkaConsumer class and the main thread.
- *
- * <p>The Handover has the notion of "waking up" the producer thread with a {@link WakeupException}
- * rather than a thread interrupt.
- *
- * <p>The Handover can also be "closed", signalling from one thread to the other that it
- * the thread has terminated.
- */
-@ThreadSafe
-public final class Handover implements Closeable {
-
- private final Object lock = new Object();
-
- private ConsumerRecords<byte[], byte[]> next;
- private Throwable error;
- private boolean wakeupProducer;
-
- /**
- * Polls the next element from the Handover, possibly blocking until the next element is
- * available. This method behaves similar to polling from a blocking queue.
- *
- * <p>If an exception was handed in by the producer ({@link #reportError(Throwable)}), then
- * that exception is thrown rather than an element being returned.
- *
- * @return The next element (buffer of records, never null).
- *
- * @throws ClosedException Thrown if the Handover was {@link #close() closed}.
- * @throws Exception Rethrows exceptions from the {@link #reportError(Throwable)} method.
- */
- @Nonnull
- public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
- synchronized (lock) {
- while (next == null && error == null) {
- lock.wait();
- }
-
- ConsumerRecords<byte[], byte[]> n = next;
- if (n != null) {
- next = null;
- lock.notifyAll();
- return n;
- }
- else {
- ExceptionUtils.rethrowException(error, error.getMessage());
-
- // this statement cannot be reached since the above method always throws an exception
- // this is only here to silence the compiler and any warnings
- return ConsumerRecords.empty();
- }
- }
- }
-
- /**
- * Hands over an element from the producer. If the Handover already has an element that was
- * not yet picked up by the consumer thread, this call blocks until the consumer picks up that
- * previous element.
- *
- * <p>This behavior is similar to a "size one" blocking queue.
- *
- * @param element The next element to hand over.
- *
- * @throws InterruptedException
- * Thrown, if the thread is interrupted while blocking for the Handover to be empty.
- * @throws WakeupException
- * Thrown, if the {@link #wakeupProducer()} method is called while blocking for
- * the Handover to be empty.
- * @throws ClosedException
- * Thrown if the Handover was closed or concurrently being closed.
- */
- public void produce(final ConsumerRecords<byte[], byte[]> element)
- throws InterruptedException, WakeupException, ClosedException {
-
- checkNotNull(element);
-
- synchronized (lock) {
- while (next != null && !wakeupProducer) {
- lock.wait();
- }
-
- wakeupProducer = false;
-
- // if there is still an element, we must have been woken up
- if (next != null) {
- throw new WakeupException();
- }
- // if there is no error, then this is open and can accept this element
- else if (error == null) {
- next = element;
- lock.notifyAll();
- }
- // an error marks this as closed for the producer
- else {
- throw new ClosedException();
- }
- }
- }
-
- /**
- * Reports an exception. The consumer will throw the given exception immediately, if
- * it is currently blocked in the {@link #pollNext()} method, or the next time it
- * calls that method.
- *
- * <p>After this method has been called, no call to either {@link #produce(ConsumerRecords)}
- * or {@link #pollNext()} will ever return regularly any more, but will always return
- * exceptionally.
- *
- * <p>If another exception was already reported, this method does nothing.
- *
- * <p>For the producer, the Handover will appear as if it was {@link #close() closed}.
- *
- * @param t The exception to report.
- */
- public void reportError(Throwable t) {
- checkNotNull(t);
-
- synchronized (lock) {
- // do not override the initial exception
- if (error == null) {
- error = t;
- }
- next = null;
- lock.notifyAll();
- }
- }
-
- /**
- * Closes the handover. Both the {@link #produce(ConsumerRecords)} method and the
- * {@link #pollNext()} will throw a {@link ClosedException} on any currently blocking and
- * future invocations.
- *
- * <p>If an exception was previously reported via the {@link #reportError(Throwable)} method,
- * that exception will not be overridden. The consumer thread will throw that exception upon
- * calling {@link #pollNext()}, rather than the {@code ClosedException}.
- */
- @Override
- public void close() {
- synchronized (lock) {
- next = null;
- wakeupProducer = false;
-
- if (error == null) {
- error = new ClosedException();
- }
- lock.notifyAll();
- }
- }
-
- /**
- * Wakes the producer thread up. If the producer thread is currently blocked in
- * the {@link #produce(ConsumerRecords)} method, it will exit the method throwing
- * a {@link WakeupException}.
- */
- public void wakeupProducer() {
- synchronized (lock) {
- wakeupProducer = true;
- lock.notifyAll();
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * An exception thrown by the Handover in the {@link #pollNext()} or
- * {@link #produce(ConsumerRecords)} method, after the Handover was closed via
- * {@link #close()}.
- */
- public static final class ClosedException extends Exception {
- private static final long serialVersionUID = 1L;
- }
-
- /**
- * A special exception thrown bv the Handover in the {@link #produce(ConsumerRecords)}
- * method when the producer is woken up from a blocking call via {@link #wakeupProducer()}.
- */
- public static final class WakeupException extends Exception {
- private static final long serialVersionUID = 1L;
- }
-}