You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:25 UTC
[35/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
new file mode 100644
index 0000000..6235449
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.api.PartitionMetadata;
+import kafka.common.KafkaException;
+import kafka.network.SocketServer;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
+import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.Seq;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * An implementation of the KafkaServerProvider for Kafka 0.8
+ */
+public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+ private File tmpZkDir;
+ private File tmpKafkaParent;
+ private List<File> tmpKafkaDirs;
+ private List<KafkaServer> brokers;
+ private TestingServer zookeeper;
+ private String zookeeperConnectionString;
+ private String brokerConnectionString = "";
+ private Properties standardProps;
+ private Properties additionalServerProperties;
+
+ public String getBrokerConnectionString() {
+ return brokerConnectionString;
+ }
+
+ @Override
+ public Properties getStandardProperties() {
+ return standardProps;
+ }
+
+ @Override
+ public Properties getSecureProperties() {
+ return null;
+ }
+
+ @Override
+ public String getVersion() {
+ return "0.8";
+ }
+
+ @Override
+ public List<KafkaServer> getBrokers() {
+ return brokers;
+ }
+
+ @Override
+ public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
+ return new FlinkKafkaConsumer08<>(topics, readSchema, props);
+ }
+
+ @Override
+ public <T> StreamSink<T> getProducerSink(
+ String topic,
+ KeyedSerializationSchema<T> serSchema,
+ Properties props,
+ KafkaPartitioner<T> partitioner) {
+ FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(
+ topic,
+ serSchema,
+ props,
+ partitioner);
+ prod.setFlushOnCheckpoint(true);
+ return new StreamSink<>(prod);
+ }
+
+ @Override
+ public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+ FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner);
+ prod.setFlushOnCheckpoint(true);
+ return stream.addSink(prod);
+ }
+
+ @Override
+ public KafkaOffsetHandler createOffsetHandler(Properties props) {
+ return new KafkaOffsetHandlerImpl(props);
+ }
+
+ @Override
+ public void restartBroker(int leaderId) throws Exception {
+ brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
+ }
+
+ @Override
+ public int getLeaderToShutDown(String topic) throws Exception {
+ ZkClient zkClient = createZkClient();
+ PartitionMetadata firstPart = null;
+ do {
+ if (firstPart != null) {
+ LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
+ // not the first try. Sleep a bit
+ Thread.sleep(150);
+ }
+
+ Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
+ firstPart = partitionMetadata.head();
+ }
+ while (firstPart.errorCode() != 0);
+ zkClient.close();
+
+ return firstPart.leader().get().id();
+ }
+
+ @Override
+ public int getBrokerId(KafkaServer server) {
+ return server.socketServer().brokerId();
+ }
+
+ @Override
+ public boolean isSecureRunSupported() {
+ return false;
+ }
+
+
+ @Override
+ public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
+ this.additionalServerProperties = additionalServerProperties;
+ File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+ tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
+ try {
+ Files.createDirectories(tmpZkDir.toPath());
+ } catch (IOException e) {
+ fail("cannot create zookeeper temp dir: " + e.getMessage());
+ }
+
+ tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir" + (UUID.randomUUID().toString()));
+ try {
+ Files.createDirectories(tmpKafkaParent.toPath());
+ } catch (IOException e) {
+ fail("cannot create kafka temp dir: " + e.getMessage());
+ }
+
+ tmpKafkaDirs = new ArrayList<>(numKafkaServers);
+ for (int i = 0; i < numKafkaServers; i++) {
+ File tmpDir = new File(tmpKafkaParent, "server-" + i);
+ assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
+ tmpKafkaDirs.add(tmpDir);
+ }
+
+ zookeeper = null;
+ brokers = null;
+
+ try {
+ LOG.info("Starting Zookeeper");
+ zookeeper = new TestingServer(-1, tmpZkDir);
+ zookeeperConnectionString = zookeeper.getConnectString();
+
+ LOG.info("Starting KafkaServer");
+ brokers = new ArrayList<>(numKafkaServers);
+
+ for (int i = 0; i < numKafkaServers; i++) {
+ brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
+ SocketServer socketServer = brokers.get(i).socketServer();
+
+ String host = socketServer.host() == null ? "localhost" : socketServer.host();
+ brokerConnectionString += hostAndPortToUrlString(host, socketServer.port()) + ",";
+ }
+
+ LOG.info("ZK and KafkaServer started.");
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ fail("Test setup failed: " + t.getMessage());
+ }
+
+ standardProps = new Properties();
+ standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+ standardProps.setProperty("bootstrap.servers", brokerConnectionString);
+ standardProps.setProperty("group.id", "flink-tests");
+ standardProps.setProperty("auto.commit.enable", "false");
+ standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis.
+ standardProps.setProperty("zookeeper.connection.timeout.ms", "30000");
+ standardProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning. (smallest is kafka 0.8)
+ standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+ }
+
+ @Override
+ public void shutdown() {
+ if (brokers != null) {
+ for (KafkaServer broker : brokers) {
+ if (broker != null) {
+ broker.shutdown();
+ }
+ }
+ brokers.clear();
+ }
+
+ if (zookeeper != null) {
+ try {
+ zookeeper.stop();
+ zookeeper.close();
+ }
+ catch (Exception e) {
+ LOG.warn("ZK.stop() failed", e);
+ }
+ zookeeper = null;
+ }
+
+ // clean up the temp spaces
+
+ if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+ try {
+ FileUtils.deleteDirectory(tmpKafkaParent);
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ }
+ if (tmpZkDir != null && tmpZkDir.exists()) {
+ try {
+ FileUtils.deleteDirectory(tmpZkDir);
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+
+ @Override
+ public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
+ // create topic with one client
+ LOG.info("Creating topic {}", topic);
+
+ ZkClient creator = createZkClient();
+
+ AdminUtils.createTopic(creator, topic, numberOfPartitions, replicationFactor, topicConfig);
+ creator.close();
+
+ // validate that the topic has been created
+ final long deadline = System.currentTimeMillis() + 30000;
+ do {
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e) {
+ // restore interrupted state
+ }
+ List<KafkaTopicPartitionLeader> partitions = FlinkKafkaConsumer08.getPartitionsForTopic(Collections.singletonList(topic), standardProps);
+ if (partitions != null && partitions.size() > 0) {
+ return;
+ }
+ }
+ while (System.currentTimeMillis() < deadline);
+ fail ("Test topic could not be created");
+ }
+
+ @Override
+ public void deleteTestTopic(String topic) {
+ LOG.info("Deleting topic {}", topic);
+
+ ZkClient zk = createZkClient();
+ AdminUtils.deleteTopic(zk, topic);
+ zk.close();
+ }
+
+ private ZkClient createZkClient() {
+ return new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+ Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
+ }
+
+ /**
+ * Only for the 0.8 server we need access to the zk client.
+ */
+ public CuratorFramework createCuratorClient() {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
+ CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy);
+ curatorClient.start();
+ return curatorClient;
+ }
+
+ /**
+ * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+ */
+ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
+ LOG.info("Starting broker with id {}", brokerId);
+ Properties kafkaProperties = new Properties();
+
+ // properties have to be Strings
+ kafkaProperties.put("advertised.host.name", KAFKA_HOST);
+ kafkaProperties.put("broker.id", Integer.toString(brokerId));
+ kafkaProperties.put("log.dir", tmpFolder.toString());
+ kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+ kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
+ kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
+
+ // for CI stability, increase zookeeper session timeout
+ kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
+ kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
+ if(additionalServerProperties != null) {
+ kafkaProperties.putAll(additionalServerProperties);
+ }
+
+ final int numTries = 5;
+
+ for (int i = 1; i <= numTries; i++) {
+ int kafkaPort = NetUtils.getAvailablePort();
+ kafkaProperties.put("port", Integer.toString(kafkaPort));
+ KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+ try {
+ KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
+ server.startup();
+ return server;
+ }
+ catch (KafkaException e) {
+ if (e.getCause() instanceof BindException) {
+ // port conflict, retry...
+ LOG.info("Port conflict when starting Kafka Broker. Retrying...");
+ }
+ else {
+ throw e;
+ }
+ }
+ }
+
+ throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
+ }
+
+ private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+ private final CuratorFramework offsetClient;
+ private final String groupId;
+
+ public KafkaOffsetHandlerImpl(Properties props) {
+ offsetClient = createCuratorClient();
+ groupId = props.getProperty("group.id");
+ }
+
+ @Override
+ public Long getCommittedOffset(String topicName, int partition) {
+ try {
+ return ZookeeperOffsetHandler.getOffsetFromZooKeeper(offsetClient, groupId, topicName, partition);
+ } catch (Exception e) {
+ throw new RuntimeException("Exception when getting offsets from Zookeeper", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ offsetClient.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
new file mode 100644
index 0000000..6298c92
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+
+public class ClosableBlockingQueueTest {
+
+ // ------------------------------------------------------------------------
+ // single-threaded unit tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testCreateQueueHashCodeEquals() {
+ try {
+ ClosableBlockingQueue<String> queue1 = new ClosableBlockingQueue<>();
+ ClosableBlockingQueue<String> queue2 = new ClosableBlockingQueue<>(22);
+
+ assertTrue(queue1.isOpen());
+ assertTrue(queue2.isOpen());
+ assertTrue(queue1.isEmpty());
+ assertTrue(queue2.isEmpty());
+ assertEquals(0, queue1.size());
+ assertEquals(0, queue2.size());
+
+ assertTrue(queue1.hashCode() == queue2.hashCode());
+ //noinspection EqualsWithItself
+ assertTrue(queue1.equals(queue1));
+ //noinspection EqualsWithItself
+ assertTrue(queue2.equals(queue2));
+ assertTrue(queue1.equals(queue2));
+
+ assertNotNull(queue1.toString());
+ assertNotNull(queue2.toString());
+
+ List<String> elements = new ArrayList<>();
+ elements.add("a");
+ elements.add("b");
+ elements.add("c");
+
+ ClosableBlockingQueue<String> queue3 = new ClosableBlockingQueue<>(elements);
+ ClosableBlockingQueue<String> queue4 = new ClosableBlockingQueue<>(asList("a", "b", "c"));
+
+ assertTrue(queue3.isOpen());
+ assertTrue(queue4.isOpen());
+ assertFalse(queue3.isEmpty());
+ assertFalse(queue4.isEmpty());
+ assertEquals(3, queue3.size());
+ assertEquals(3, queue4.size());
+
+ assertTrue(queue3.hashCode() == queue4.hashCode());
+ //noinspection EqualsWithItself
+ assertTrue(queue3.equals(queue3));
+ //noinspection EqualsWithItself
+ assertTrue(queue4.equals(queue4));
+ assertTrue(queue3.equals(queue4));
+
+ assertNotNull(queue3.toString());
+ assertNotNull(queue4.toString());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCloseEmptyQueue() {
+ try {
+ ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
+ assertTrue(queue.isOpen());
+ assertTrue(queue.close());
+ assertFalse(queue.isOpen());
+
+ assertFalse(queue.addIfOpen("element"));
+ assertTrue(queue.isEmpty());
+
+ try {
+ queue.add("some element");
+ fail("should cause an exception");
+ } catch (IllegalStateException ignored) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCloseNonEmptyQueue() {
+ try {
+ ClosableBlockingQueue<Integer> queue = new ClosableBlockingQueue<>(asList(1, 2, 3));
+ assertTrue(queue.isOpen());
+
+ assertFalse(queue.close());
+ assertFalse(queue.close());
+
+ queue.poll();
+
+ assertFalse(queue.close());
+ assertFalse(queue.close());
+
+ queue.pollBatch();
+
+ assertTrue(queue.close());
+ assertFalse(queue.isOpen());
+
+ assertFalse(queue.addIfOpen(42));
+ assertTrue(queue.isEmpty());
+
+ try {
+ queue.add(99);
+ fail("should cause an exception");
+ } catch (IllegalStateException ignored) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPeekAndPoll() {
+ try {
+ ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
+
+ assertNull(queue.peek());
+ assertNull(queue.peek());
+ assertNull(queue.poll());
+ assertNull(queue.poll());
+
+ assertEquals(0, queue.size());
+
+ queue.add("a");
+ queue.add("b");
+ queue.add("c");
+
+ assertEquals(3, queue.size());
+
+ assertEquals("a", queue.peek());
+ assertEquals("a", queue.peek());
+ assertEquals("a", queue.peek());
+
+ assertEquals(3, queue.size());
+
+ assertEquals("a", queue.poll());
+ assertEquals("b", queue.poll());
+
+ assertEquals(1, queue.size());
+
+ assertEquals("c", queue.peek());
+ assertEquals("c", queue.peek());
+
+ assertEquals("c", queue.poll());
+
+ assertEquals(0, queue.size());
+ assertNull(queue.poll());
+ assertNull(queue.peek());
+ assertNull(queue.peek());
+
+ assertTrue(queue.close());
+
+ try {
+ queue.peek();
+ fail("should cause an exception");
+ } catch (IllegalStateException ignored) {
+ // expected
+ }
+
+ try {
+ queue.poll();
+ fail("should cause an exception");
+ } catch (IllegalStateException ignored) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPollBatch() {
+ try {
+ ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
+
+ assertNull(queue.pollBatch());
+
+ queue.add("a");
+ queue.add("b");
+
+ assertEquals(asList("a", "b"), queue.pollBatch());
+ assertNull(queue.pollBatch());
+
+ queue.add("c");
+
+ assertEquals(singletonList("c"), queue.pollBatch());
+ assertNull(queue.pollBatch());
+
+ assertTrue(queue.close());
+
+ try {
+ queue.pollBatch();
+ fail("should cause an exception");
+ } catch (IllegalStateException ignored) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGetElementBlocking() {
+ try {
+ ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
+
+ assertNull(queue.getElementBlocking(1));
+ assertNull(queue.getElementBlocking(3));
+ assertNull(queue.getElementBlocking(2));
+
+ assertEquals(0, queue.size());
+
+ queue.add("a");
+ queue.add("b");
+ queue.add("c");
+ queue.add("d");
+ queue.add("e");
+ queue.add("f");
+
+ assertEquals(6, queue.size());
+
+ assertEquals("a", queue.getElementBlocking(99));
+ assertEquals("b", queue.getElementBlocking());
+
+ assertEquals(4, queue.size());
+
+ assertEquals("c", queue.getElementBlocking(0));
+ assertEquals("d", queue.getElementBlocking(1000000));
+ assertEquals("e", queue.getElementBlocking());
+ assertEquals("f", queue.getElementBlocking(1786598));
+
+ assertEquals(0, queue.size());
+
+ assertNull(queue.getElementBlocking(1));
+ assertNull(queue.getElementBlocking(3));
+ assertNull(queue.getElementBlocking(2));
+
+ assertTrue(queue.close());
+
+ try {
+ queue.getElementBlocking();
+ fail("should cause an exception");
+ } catch (IllegalStateException ignored) {
+ // expected
+ }
+
+ try {
+ queue.getElementBlocking(1000000000L);
+ fail("should cause an exception");
+ } catch (IllegalStateException ignored) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGetBatchBlocking() {
+ try {
+ ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
+
+ assertEquals(emptyList(), queue.getBatchBlocking(1));
+ assertEquals(emptyList(), queue.getBatchBlocking(3));
+ assertEquals(emptyList(), queue.getBatchBlocking(2));
+
+ queue.add("a");
+ queue.add("b");
+
+ assertEquals(asList("a", "b"), queue.getBatchBlocking(900000009));
+
+ queue.add("c");
+ queue.add("d");
+
+ assertEquals(asList("c", "d"), queue.getBatchBlocking());
+
+ assertEquals(emptyList(), queue.getBatchBlocking(2));
+
+ queue.add("e");
+
+ assertEquals(singletonList("e"), queue.getBatchBlocking(0));
+
+ queue.add("f");
+
+ assertEquals(singletonList("f"), queue.getBatchBlocking(1000000000));
+
+ assertEquals(0, queue.size());
+
+ assertEquals(emptyList(), queue.getBatchBlocking(1));
+ assertEquals(emptyList(), queue.getBatchBlocking(3));
+ assertEquals(emptyList(), queue.getBatchBlocking(2));
+
+ assertTrue(queue.close());
+
+ try {
+ queue.getBatchBlocking();
+ fail("should cause an exception");
+ } catch (IllegalStateException ignored) {
+ // expected
+ }
+
+ try {
+ queue.getBatchBlocking(1000000000L);
+ fail("should cause an exception");
+ } catch (IllegalStateException ignored) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // multi-threaded tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void notifyOnClose() {
+ try {
+ final long oneYear = 365L * 24 * 60 * 60 * 1000;
+
+ // test "getBatchBlocking()"
+ final ClosableBlockingQueue<String> queue1 = new ClosableBlockingQueue<>();
+ QueueCall call1 = new QueueCall() {
+ @Override
+ public void call() throws Exception {
+ queue1.getBatchBlocking();
+ }
+ };
+ testCallExitsOnClose(call1, queue1);
+
+ // test "getBatchBlocking()"
+ final ClosableBlockingQueue<String> queue2 = new ClosableBlockingQueue<>();
+ QueueCall call2 = new QueueCall() {
+ @Override
+ public void call() throws Exception {
+ queue2.getBatchBlocking(oneYear);
+ }
+ };
+ testCallExitsOnClose(call2, queue2);
+
+ // test "getBatchBlocking()"
+ final ClosableBlockingQueue<String> queue3 = new ClosableBlockingQueue<>();
+ QueueCall call3 = new QueueCall() {
+ @Override
+ public void call() throws Exception {
+ queue3.getElementBlocking();
+ }
+ };
+ testCallExitsOnClose(call3, queue3);
+
+ // test "getBatchBlocking()"
+ final ClosableBlockingQueue<String> queue4 = new ClosableBlockingQueue<>();
+ QueueCall call4 = new QueueCall() {
+ @Override
+ public void call() throws Exception {
+ queue4.getElementBlocking(oneYear);
+ }
+ };
+ testCallExitsOnClose(call4, queue4);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ @Test
+ public void testMultiThreadedAddGet() {
+ try {
+ final ClosableBlockingQueue<Integer> queue = new ClosableBlockingQueue<>();
+ final AtomicReference<Throwable> pushErrorRef = new AtomicReference<>();
+ final AtomicReference<Throwable> pollErrorRef = new AtomicReference<>();
+
+ final int numElements = 2000;
+
+ Thread pusher = new Thread("pusher") {
+
+ @Override
+ public void run() {
+ try {
+ final Random rnd = new Random();
+ for (int i = 0; i < numElements; i++) {
+ queue.add(i);
+
+ // sleep a bit, sometimes
+ int sleepTime = rnd.nextInt(3);
+ if (sleepTime > 1) {
+ Thread.sleep(sleepTime);
+ }
+ }
+
+ while (true) {
+ if (queue.close()) {
+ break;
+ } else {
+ Thread.sleep(5);
+ }
+ }
+ } catch (Throwable t) {
+ pushErrorRef.set(t);
+ }
+ }
+ };
+ pusher.start();
+
+ Thread poller = new Thread("poller") {
+
+ @SuppressWarnings("InfiniteLoopStatement")
+ @Override
+ public void run() {
+ try {
+ int count = 0;
+
+ try {
+ final Random rnd = new Random();
+ int nextExpected = 0;
+
+ while (true) {
+ int getMethod = count % 7;
+ switch (getMethod) {
+ case 0: {
+ Integer next = queue.getElementBlocking(1);
+ if (next != null) {
+ assertEquals(nextExpected, next.intValue());
+ nextExpected++;
+ count++;
+ }
+ break;
+ }
+ case 1: {
+ List<Integer> nextList = queue.getBatchBlocking();
+ for (Integer next : nextList) {
+ assertNotNull(next);
+ assertEquals(nextExpected, next.intValue());
+ nextExpected++;
+ count++;
+ }
+ break;
+ }
+ case 2: {
+ List<Integer> nextList = queue.getBatchBlocking(1);
+ if (nextList != null) {
+ for (Integer next : nextList) {
+ assertNotNull(next);
+ assertEquals(nextExpected, next.intValue());
+ nextExpected++;
+ count++;
+ }
+ }
+ break;
+ }
+ case 3: {
+ Integer next = queue.poll();
+ if (next != null) {
+ assertEquals(nextExpected, next.intValue());
+ nextExpected++;
+ count++;
+ }
+ break;
+ }
+ case 4: {
+ List<Integer> nextList = queue.pollBatch();
+ if (nextList != null) {
+ for (Integer next : nextList) {
+ assertNotNull(next);
+ assertEquals(nextExpected, next.intValue());
+ nextExpected++;
+ count++;
+ }
+ }
+ break;
+ }
+ default: {
+ Integer next = queue.getElementBlocking();
+ assertNotNull(next);
+ assertEquals(nextExpected, next.intValue());
+ nextExpected++;
+ count++;
+ }
+ }
+
+ // sleep a bit, sometimes
+ int sleepTime = rnd.nextInt(3);
+ if (sleepTime > 1) {
+ Thread.sleep(sleepTime);
+ }
+ }
+ } catch (IllegalStateException e) {
+ // we get this once the queue is closed
+ assertEquals(numElements, count);
+ }
+ } catch (Throwable t) {
+ pollErrorRef.set(t);
+ }
+ }
+ };
+ poller.start();
+
+ pusher.join();
+ poller.join();
+
+ if (pushErrorRef.get() != null) {
+ Throwable t = pushErrorRef.get();
+ t.printStackTrace();
+ fail("Error in pusher: " + t.getMessage());
+ }
+ if (pollErrorRef.get() != null) {
+ Throwable t = pollErrorRef.get();
+ t.printStackTrace();
+ fail("Error in poller: " + t.getMessage());
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utils
+ // ------------------------------------------------------------------------
+
+ private static void testCallExitsOnClose(
+ final QueueCall call, ClosableBlockingQueue<String> queue) throws Exception {
+
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ call.call();
+ } catch (Throwable t) {
+ errorRef.set(t);
+ }
+ }
+ };
+
+ Thread thread = new Thread(runnable);
+ thread.start();
+ Thread.sleep(100);
+ queue.close();
+ thread.join();
+
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ Throwable cause = errorRef.get();
+ assertTrue(cause instanceof IllegalStateException);
+ }
+
+ private interface QueueCall {
+ void call() throws Exception;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..fbeb110
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
@@ -0,0 +1,30 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+log4j.logger.org.apache.zookeeper=OFF, testlogger
+log4j.logger.state.change.logger=OFF, testlogger
+log4j.logger.kafka=OFF, testlogger
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml b/flink-connectors/flink-connector-kafka-0.9/pom.xml
new file mode 100644
index 0000000..3894499
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml
@@ -0,0 +1,212 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.2-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
+ <name>flink-connector-kafka-0.9</name>
+
+ <packaging>jar</packaging>
+
+ <!-- Allow users to pass custom connector versions -->
+ <properties>
+ <kafka.version>0.9.0.1</kafka.version>
+ </properties>
+
+ <dependencies>
+
+ <!-- core dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-base_2.10</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <!-- Projects depending on this project,
+ won't depend on flink-table. -->
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-base_2.10</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <!-- exclude 0.8 dependencies -->
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ </exclusion>
+ </exclusions>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <!-- include 0.9 server for tests -->
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>${kafka.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-jmx</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-tests_2.10</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.10</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minikdc</artifactId>
+ <version>${minikdc.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <configuration>
+ <includes>
+ <include>**/KafkaTestEnvironmentImpl*</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-test-sources</id>
+ <goals>
+ <goal>test-jar-no-fork</goal>
+ </goals>
+ <configuration>
+ <includes>
+ <include>**/KafkaTestEnvironmentImpl*</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
+ <forkCount>1</forkCount>
+ <argLine>-Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
+ </configuration>
+ </plugin>
+ <!--
+ https://issues.apache.org/jira/browse/DIRSHARED-134
+ Required to pull the Mini-KDC transitive dependency
+ -->
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>3.0.1</version>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
new file mode 100644
index 0000000..29bb8e4
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
+ * Apache Kafka 0.9.x. The consumer can run in multiple parallel instances, each of which will pull
+ * data from one or more Kafka partitions.
+ *
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
+ * during a failure, and that the computation processes elements "exactly once".
+ * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
+ *
+ * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer
+ * has consumed a topic.</p>
+ *
+ * <p>Please refer to Kafka's documentation for the available configuration properties:
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs</p>
+ *
+ * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
+ * is constructed. That means that the client that submits the program needs to be able to
+ * reach the Kafka brokers or ZooKeeper.</p>
+ */
+public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
+
+ private static final long serialVersionUID = 2324564345203409112L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer09.class);
+
+ /** Configuration key to change the polling timeout **/
+ public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
+
+
+ /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
+ * available. If 0, returns immediately with any records that are available now. */
+ public static final long DEFAULT_POLL_TIMEOUT = 100L;
+
+ // ------------------------------------------------------------------------
+
+ /** User-supplied properties for Kafka **/
+ protected final Properties properties;
+
+ /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
+ * available. If 0, returns immediately with any records that are available now */
+ protected final long pollTimeout;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 0.9.x
+ *
+ * @param topic
+ * The name of the topic that should be consumed.
+ * @param valueDeserializer
+ * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+ */
+ public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
+ this(Collections.singletonList(topic), valueDeserializer, props);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 0.9.x
+ *
+ * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+ * pairs, offsets, and topic names from Kafka.
+ *
+ * @param topic
+ * The name of the topic that should be consumed.
+ * @param deserializer
+ * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+ */
+ public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
+ this(Collections.singletonList(topic), deserializer, props);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 0.9.x
+ *
+ * This constructor allows passing multiple topics to the consumer.
+ *
+ * @param topics
+ * The Kafka topics to read from.
+ * @param deserializer
+ * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties that are used to configure both the fetcher and the offset handler.
+ */
+ public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
+ this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 0.9.x
+ *
+ * This constructor allows passing multiple topics and a key/value deserialization schema.
+ *
+ * @param topics
+ * The Kafka topics to read from.
+ * @param deserializer
+ * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties that are used to configure both the fetcher and the offset handler.
+ */
+ public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
+ super(topics, deserializer);
+
+ this.properties = checkNotNull(props, "props");
+ setDeserializer(this.properties);
+
+ // configure the polling timeout
+ try {
+ if (properties.containsKey(KEY_POLL_TIMEOUT)) {
+ this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
+ } else {
+ this.pollTimeout = DEFAULT_POLL_TIMEOUT;
+ }
+ }
+ catch (Exception e) {
+ throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
+ }
+ }
+
+ @Override
+ protected AbstractFetcher<T, ?> createFetcher(
+ SourceContext<T> sourceContext,
+ List<KafkaTopicPartition> thisSubtaskPartitions,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+ StreamingRuntimeContext runtimeContext) throws Exception {
+
+ boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
+
+ return new Kafka09Fetcher<>(
+ sourceContext,
+ thisSubtaskPartitions,
+ watermarksPeriodic,
+ watermarksPunctuated,
+ runtimeContext.getProcessingTimeService(),
+ runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+ runtimeContext.getUserCodeClassLoader(),
+ runtimeContext.isCheckpointingEnabled(),
+ runtimeContext.getTaskNameWithSubtasks(),
+ runtimeContext.getMetricGroup(),
+ deserializer,
+ properties,
+ pollTimeout,
+ useMetrics);
+
+ }
+
+ @Override
+ protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
+ // read the partitions that belong to the listed topics
+ final List<KafkaTopicPartition> partitions = new ArrayList<>();
+
+ try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(this.properties)) {
+ for (final String topic: topics) {
+ // get partitions for each topic
+ List<PartitionInfo> partitionsForTopic = consumer.partitionsFor(topic);
+ // for non existing topics, the list might be null.
+ if (partitionsForTopic != null) {
+ partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
+ }
+ }
+ }
+
+ if (partitions.isEmpty()) {
+ throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics);
+ }
+
+ // we now have a list of partitions which is the same for all parallel consumer instances.
+ LOG.info("Got {} partitions from these topics: {}", partitions.size(), topics);
+
+ if (LOG.isInfoEnabled()) {
+ logPartitionInfo(LOG, partitions);
+ }
+
+ return partitions;
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * Converts a list of Kafka PartitionInfo's to Flink's KafkaTopicPartition (which are serializable)
+ *
+ * @param partitions A list of Kafka PartitionInfos.
+ * @return A list of KafkaTopicPartitions
+ */
+ private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
+ checkNotNull(partitions);
+
+ List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size());
+ for (PartitionInfo pi : partitions) {
+ ret.add(new KafkaTopicPartition(pi.topic(), pi.partition()));
+ }
+ return ret;
+ }
+
+ /**
+ * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties.
+ *
+ * @param props The Kafka properties to register the serializer in.
+ */
+ private static void setDeserializer(Properties props) {
+ final String deSerName = ByteArrayDeserializer.class.getCanonicalName();
+
+ Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+ if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
+ LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ }
+ if (valDeSer != null && !valDeSer.equals(deSerName)) {
+ LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+ }
+
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
new file mode 100644
index 0000000..2a3e39d
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.9.
+ *
+ * Please note that this producer does not have any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
+
+ private static final long serialVersionUID = 1L;
+
+ // ------------------- Keyless serialization schema constructors ----------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+ * the topic.
+ *
+ * @param brokerList
+ * Comma separated addresses of the brokers
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined (keyless) serialization schema.
+ */
+ public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+ * the topic.
+ *
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined (keyless) serialization schema.
+ * @param producerConfig
+ * Properties with the producer configuration.
+ */
+ public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+ * the topic.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+ */
+ public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+
+ }
+
+ // ------------------- Key/Value serialization schema constructors ----------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+ * the topic.
+ *
+ * @param brokerList
+ * Comma separated addresses of the brokers
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined serialization schema supporting key/value messages
+ */
+ public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
+ this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+ * the topic.
+ *
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined serialization schema supporting key/value messages
+ * @param producerConfig
+ * Properties with the producer configuration.
+ */
+ public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
+ this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+ * the topic.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+ */
+ public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ super(topicId, serializationSchema, producerConfig, customPartitioner);
+ }
+
+ @Override
+ protected void flush() {
+ if (this.producer != null) {
+ producer.flush();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
new file mode 100644
index 0000000..38ea47c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
+ */
+public class Kafka09JsonTableSink extends KafkaJsonTableSink {
+ /**
+ * Creates {@link KafkaTableSink} for Kafka 0.9
+ *
+ * @param topic topic in Kafka to which table is written
+ * @param properties properties to connect to Kafka
+ * @param partitioner Kafka partitioner
+ */
+ public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+ super(topic, properties, partitioner);
+ }
+
+ @Override
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
+ return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
+ }
+
+ @Override
+ protected Kafka09JsonTableSink createCopy() {
+ return new Kafka09JsonTableSink(topic, properties, partitioner);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
new file mode 100644
index 0000000..975ef58
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.9.
+ */
+public class Kafka09JsonTableSource extends KafkaJsonTableSource {
+
+ /**
+ * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ public Kafka09JsonTableSource(
+ String topic,
+ Properties properties,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes) {
+
+ super(topic, properties, fieldNames, fieldTypes);
+ }
+
+ /**
+ * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ public Kafka09JsonTableSource(
+ String topic,
+ Properties properties,
+ String[] fieldNames,
+ Class<?>[] fieldTypes) {
+
+ super(topic, properties, fieldNames, fieldTypes);
+ }
+
+ @Override
+ FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+ return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
new file mode 100644
index 0000000..03b5040
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.9.
+ */
+public class Kafka09TableSource extends KafkaTableSource {
+
+ /**
+ * Creates a Kafka 0.9 {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema to use for Kafka records.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ public Kafka09TableSource(
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes) {
+
+ super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
+ }
+
+ /**
+ * Creates a Kafka 0.9 {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema to use for Kafka records.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ public Kafka09TableSource(
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ String[] fieldNames,
+ Class<?>[] fieldTypes) {
+
+ super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
+ }
+
+ @Override
+ FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+ return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
new file mode 100644
index 0000000..e6e3c51
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Closeable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Handover is a utility to hand over data (a buffer of records) and exception from a
+ * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves like a
+ * "size one blocking queue", with some extras around exception reporting, closing, and
+ * waking up thread without {@link Thread#interrupt() interrupting} threads.
+ *
+ * <p>This class is used in the Flink Kafka Consumer to hand over data and exceptions between
+ * the thread that runs the KafkaConsumer class and the main thread.
+ *
+ * <p>The Handover has the notion of "waking up" the producer thread with a {@link WakeupException}
+ * rather than a thread interrupt.
+ *
+ * <p>The Handover can also be "closed", signalling from one thread to the other that it
+ * the thread has terminated.
+ */
+@ThreadSafe
+public final class Handover implements Closeable {
+
+ private final Object lock = new Object();
+
+ private ConsumerRecords<byte[], byte[]> next;
+ private Throwable error;
+ private boolean wakeupProducer;
+
+ /**
+ * Polls the next element from the Handover, possibly blocking until the next element is
+ * available. This method behaves similar to polling from a blocking queue.
+ *
+ * <p>If an exception was handed in by the producer ({@link #reportError(Throwable)}), then
+ * that exception is thrown rather than an element being returned.
+ *
+ * @return The next element (buffer of records, never null).
+ *
+ * @throws ClosedException Thrown if the Handover was {@link #close() closed}.
+ * @throws Exception Rethrows exceptions from the {@link #reportError(Throwable)} method.
+ */
+ @Nonnull
+ public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
+ synchronized (lock) {
+ while (next == null && error == null) {
+ lock.wait();
+ }
+
+ ConsumerRecords<byte[], byte[]> n = next;
+ if (n != null) {
+ next = null;
+ lock.notifyAll();
+ return n;
+ }
+ else {
+ ExceptionUtils.rethrowException(error, error.getMessage());
+
+ // this statement cannot be reached since the above method always throws an exception
+ // this is only here to silence the compiler and any warnings
+ return ConsumerRecords.empty();
+ }
+ }
+ }
+
+ /**
+ * Hands over an element from the producer. If the Handover already has an element that was
+ * not yet picked up by the consumer thread, this call blocks until the consumer picks up that
+ * previous element.
+ *
+ * <p>This behavior is similar to a "size one" blocking queue.
+ *
+ * @param element The next element to hand over.
+ *
+ * @throws InterruptedException
+ * Thrown, if the thread is interrupted while blocking for the Handover to be empty.
+ * @throws WakeupException
+ * Thrown, if the {@link #wakeupProducer()} method is called while blocking for
+ * the Handover to be empty.
+ * @throws ClosedException
+ * Thrown if the Handover was closed or concurrently being closed.
+ */
+ public void produce(final ConsumerRecords<byte[], byte[]> element)
+ throws InterruptedException, WakeupException, ClosedException {
+
+ checkNotNull(element);
+
+ synchronized (lock) {
+ while (next != null && !wakeupProducer) {
+ lock.wait();
+ }
+
+ wakeupProducer = false;
+
+ // if there is still an element, we must have been woken up
+ if (next != null) {
+ throw new WakeupException();
+ }
+ // if there is no error, then this is open and can accept this element
+ else if (error == null) {
+ next = element;
+ lock.notifyAll();
+ }
+ // an error marks this as closed for the producer
+ else {
+ throw new ClosedException();
+ }
+ }
+ }
+
+ /**
+ * Reports an exception. The consumer will throw the given exception immediately, if
+ * it is currently blocked in the {@link #pollNext()} method, or the next time it
+ * calls that method.
+ *
+ * <p>After this method has been called, no call to either {@link #produce(ConsumerRecords)}
+ * or {@link #pollNext()} will ever return regularly any more, but will always return
+ * exceptionally.
+ *
+ * <p>If another exception was already reported, this method does nothing.
+ *
+ * <p>For the producer, the Handover will appear as if it was {@link #close() closed}.
+ *
+ * @param t The exception to report.
+ */
+ public void reportError(Throwable t) {
+ checkNotNull(t);
+
+ synchronized (lock) {
+ // do not override the initial exception
+ if (error == null) {
+ error = t;
+ }
+ next = null;
+ lock.notifyAll();
+ }
+ }
+
+ /**
+ * Closes the handover. Both the {@link #produce(ConsumerRecords)} method and the
+ * {@link #pollNext()} will throw a {@link ClosedException} on any currently blocking and
+ * future invocations.
+ *
+ * <p>If an exception was previously reported via the {@link #reportError(Throwable)} method,
+ * that exception will not be overridden. The consumer thread will throw that exception upon
+ * calling {@link #pollNext()}, rather than the {@code ClosedException}.
+ */
+ @Override
+ public void close() {
+ synchronized (lock) {
+ next = null;
+ wakeupProducer = false;
+
+ if (error == null) {
+ error = new ClosedException();
+ }
+ lock.notifyAll();
+ }
+ }
+
+ /**
+ * Wakes the producer thread up. If the producer thread is currently blocked in
+ * the {@link #produce(ConsumerRecords)} method, it will exit the method throwing
+ * a {@link WakeupException}.
+ */
+ public void wakeupProducer() {
+ synchronized (lock) {
+ wakeupProducer = true;
+ lock.notifyAll();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * An exception thrown by the Handover in the {@link #pollNext()} or
+ * {@link #produce(ConsumerRecords)} method, after the Handover was closed via
+ * {@link #close()}.
+ */
+ public static final class ClosedException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+
+ /**
+ * A special exception thrown bv the Handover in the {@link #produce(ConsumerRecords)}
+ * method when the producer is woken up from a blocking call via {@link #wakeupProducer()}.
+ */
+ public static final class WakeupException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
new file mode 100644
index 0000000..d495327
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API.
+ *
+ * @param <T> The type of elements produced by the fetcher.
+ */
+public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Kafka09Fetcher.class);
+
+ // ------------------------------------------------------------------------
+
+ /** The schema to convert between Kafka's byte messages, and Flink's objects */
+ private final KeyedDeserializationSchema<T> deserializer;
+
+ /** The handover of data and exceptions between the consumer thread and the task thread */
+ private final Handover handover;
+
+ /** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher */
+ private final KafkaConsumerThread consumerThread;
+
+ /** Flag to mark the main work loop as alive */
+ private volatile boolean running = true;
+
+ // ------------------------------------------------------------------------
+
+ public Kafka09Fetcher(
+ SourceContext<T> sourceContext,
+ List<KafkaTopicPartition> assignedPartitions,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+ ProcessingTimeService processingTimeProvider,
+ long autoWatermarkInterval,
+ ClassLoader userCodeClassLoader,
+ boolean enableCheckpointing,
+ String taskNameWithSubtasks,
+ MetricGroup metricGroup,
+ KeyedDeserializationSchema<T> deserializer,
+ Properties kafkaProperties,
+ long pollTimeout,
+ boolean useMetrics) throws Exception
+ {
+ super(
+ sourceContext,
+ assignedPartitions,
+ watermarksPeriodic,
+ watermarksPunctuated,
+ processingTimeProvider,
+ autoWatermarkInterval,
+ userCodeClassLoader,
+ useMetrics);
+
+ this.deserializer = deserializer;
+ this.handover = new Handover();
+
+ final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
+ addOffsetStateGauge(kafkaMetricGroup);
+
+ // if checkpointing is enabled, we are not automatically committing to Kafka.
+ kafkaProperties.setProperty(
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ Boolean.toString(!enableCheckpointing));
+
+ this.consumerThread = new KafkaConsumerThread(
+ LOG,
+ handover,
+ kafkaProperties,
+ subscribedPartitions(),
+ kafkaMetricGroup,
+ createCallBridge(),
+ getFetcherName() + " for " + taskNameWithSubtasks,
+ pollTimeout,
+ useMetrics);
+ }
+
+ // ------------------------------------------------------------------------
+ // Fetcher work methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void runFetchLoop() throws Exception {
+ try {
+ final Handover handover = this.handover;
+
+ // kick off the actual Kafka consumer
+ consumerThread.start();
+
+ while (running) {
+ // this blocks until we get the next records
+ // it automatically re-throws exceptions encountered in the fetcher thread
+ final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
+
+ // get the records for each topic partition
+ for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
+
+ List<ConsumerRecord<byte[], byte[]>> partitionRecords =
+ records.records(partition.getKafkaPartitionHandle());
+
+ for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
+
+ final T value = deserializer.deserialize(
+ record.key(), record.value(),
+ record.topic(), record.partition(), record.offset());
+
+ if (deserializer.isEndOfStream(value)) {
+ // end of stream signaled
+ running = false;
+ break;
+ }
+
+ // emit the actual record. this also updates offset state atomically
+ // and deals with timestamps and watermark generation
+ emitRecord(value, partition, record.offset(), record);
+ }
+ }
+ }
+ }
+ finally {
+ // this signals the consumer thread that no more work is to be done
+ consumerThread.shutdown();
+ }
+
+ // on a clean exit, wait for the runner thread
+ try {
+ consumerThread.join();
+ }
+ catch (InterruptedException e) {
+ // may be the result of a wake-up interruption after an exception.
+ // we ignore this here and only restore the interruption state
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // flag the main thread to exit. A thread interrupt will come anyways.
+ running = false;
+ handover.close();
+ consumerThread.shutdown();
+ }
+
+ // ------------------------------------------------------------------------
+ // The below methods are overridden in the 0.10 fetcher, which otherwise
+ // reuses most of the 0.9 fetcher behavior
+ // ------------------------------------------------------------------------
+
+ protected void emitRecord(
+ T record,
+ KafkaTopicPartitionState<TopicPartition> partition,
+ long offset,
+ @SuppressWarnings("UnusedParameters") ConsumerRecord<?, ?> consumerRecord) throws Exception {
+
+ // the 0.9 Fetcher does not try to extract a timestamp
+ emitRecord(record, partition, offset);
+ }
+
+ /**
+ * Gets the name of this fetcher, for thread naming and logging purposes.
+ */
+ protected String getFetcherName() {
+ return "Kafka 0.9 Fetcher";
+ }
+
+ protected KafkaConsumerCallBridge createCallBridge() {
+ return new KafkaConsumerCallBridge();
+ }
+
+ // ------------------------------------------------------------------------
+ // Implement Methods of the AbstractFetcher
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
+ return new TopicPartition(partition.getTopic(), partition.getPartition());
+ }
+
+ @Override
+ public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+ KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions();
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);
+
+ for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
+ Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
+ if (lastProcessedOffset != null) {
+ // committed offsets through the KafkaConsumer need to be 1 more than the last processed offset.
+ // This does not affect Flink's checkpoints/saved state.
+ long offsetToCommit = lastProcessedOffset + 1;
+
+ offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
+ partition.setCommittedOffset(offsetToCommit);
+ }
+ }
+
+ // record the work to be committed by the main consumer thread and make sure the consumer notices that
+ consumerThread.setOffsetsToCommit(offsetsToCommit);
+ }
+}