[FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/
-import org.junit.Test;
-public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase {
-	@Test(timeout=60000)
-	public void testAutoOffsetReset() throws Exception {
-		runAutoOffsetResetTest();
-	}
-	@Test(timeout=60000)
-	public void testAutoOffsetResetNone() throws Exception {
-		runFailOnAutoOffsetResetNoneEager();
-	}
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/
-import kafka.admin.AdminUtils;
-import kafka.api.PartitionMetadata;
-import kafka.common.KafkaException;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
-import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.Seq;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
-import static org.junit.Assert.assertTrue;
-import static;
- * An implementation of the KafkaServerProvider for Kafka 0.8
- */
-public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
-	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
-	private File tmpZkDir;
-	private File tmpKafkaParent;
-	private List<File> tmpKafkaDirs;
-	private List<KafkaServer> brokers;
-	private TestingServer zookeeper;
-	private String zookeeperConnectionString;
-	private String brokerConnectionString = "";
-	private Properties standardProps;
-	private Properties additionalServerProperties;
-	public String getBrokerConnectionString() {
-		return brokerConnectionString;
-	}
-	@Override
-	public Properties getStandardProperties() {
-		return standardProps;
-	}
-	@Override
-	public Properties getSecureProperties() {
-		return null;
-	}
-	@Override
-	public String getVersion() {
-		return "0.8";
-	}
-	@Override
-	public List<KafkaServer> getBrokers() {
-		return brokers;
-	}
-	@Override
-	public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
-		return new FlinkKafkaConsumer08<>(topics, readSchema, props);
-	}
-	@Override
-	public <T> StreamSink<T> getProducerSink(
-			String topic,
-			KeyedSerializationSchema<T> serSchema,
-			Properties props,
-			KafkaPartitioner<T> partitioner) {
-		FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(
-				topic,
-				serSchema,
-				props,
-				partitioner);
-		prod.setFlushOnCheckpoint(true);
-		return new StreamSink<>(prod);
-	}
-	@Override
-	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
-		FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner);
-		prod.setFlushOnCheckpoint(true);
-		return stream.addSink(prod);
-	}
-	@Override
-	public KafkaOffsetHandler createOffsetHandler(Properties props) {
-		return new KafkaOffsetHandlerImpl(props);
-	}
-	@Override
-	public void restartBroker(int leaderId) throws Exception {
-		brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
-	}
-	@Override
-	public int getLeaderToShutDown(String topic) throws Exception {
-		ZkClient zkClient = createZkClient();
-		PartitionMetadata firstPart = null;
-		do {
-			if (firstPart != null) {
-"Unable to find leader. error code {}", firstPart.errorCode());
-				// not the first try. Sleep a bit
-				Thread.sleep(150);
-			}
-			Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
-			firstPart = partitionMetadata.head();
-		}
-		while (firstPart.errorCode() != 0);
-		zkClient.close();
-		return firstPart.leader().get().id();
-	}
-	@Override
-	public int getBrokerId(KafkaServer server) {
-		return server.socketServer().brokerId();
-	}
-	@Override
-	public boolean isSecureRunSupported() {
-		return false;
-	}
-	@Override
-	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
-		this.additionalServerProperties = additionalServerProperties;
-		File tempDir = new File(System.getProperty(""));
-		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
-		try {
-			Files.createDirectories(tmpZkDir.toPath());
-		} catch (IOException e) {
-			fail("cannot create zookeeper temp dir: " + e.getMessage());
-		}
-		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir" + (UUID.randomUUID().toString()));
-		try {
-			Files.createDirectories(tmpKafkaParent.toPath());
-		} catch (IOException e) {
-			fail("cannot create kafka temp dir: " + e.getMessage());
-		}
-		tmpKafkaDirs = new ArrayList<>(numKafkaServers);
-		for (int i = 0; i < numKafkaServers; i++) {
-			File tmpDir = new File(tmpKafkaParent, "server-" + i);
-			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
-			tmpKafkaDirs.add(tmpDir);
-		}
-		zookeeper = null;
-		brokers = null;
-		try {
-"Starting Zookeeper");
-			zookeeper = new TestingServer(-1, tmpZkDir);
-			zookeeperConnectionString = zookeeper.getConnectString();
-"Starting KafkaServer");
-			brokers = new ArrayList<>(numKafkaServers);
-			for (int i = 0; i < numKafkaServers; i++) {
-				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
-				SocketServer socketServer = brokers.get(i).socketServer();
-				String host = == null ? "localhost" :;
-				brokerConnectionString += hostAndPortToUrlString(host, socketServer.port()) + ",";
-			}
-"ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
-		}
-		standardProps = new Properties();
-		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
-		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
-		standardProps.setProperty("", "flink-tests");
-		standardProps.setProperty("auto.commit.enable", "false");
-		standardProps.setProperty("", "30000"); // 6 seconds is default. Seems to be too small for travis.
-		standardProps.setProperty("", "30000");
-		standardProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning. (smallest is kafka 0.8)
-		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
-	}
-	@Override
-	public void shutdown() {
-		if (brokers != null) {
-			for (KafkaServer broker : brokers) {
-				if (broker != null) {
-					broker.shutdown();
-				}
-			}
-			brokers.clear();
-		}
-		if (zookeeper != null) {
-			try {
-				zookeeper.stop();
-				zookeeper.close();
-			}
-			catch (Exception e) {
-				LOG.warn("ZK.stop() failed", e);
-			}
-			zookeeper = null;
-		}
-		// clean up the temp spaces
-		if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
-			try {
-				FileUtils.deleteDirectory(tmpKafkaParent);
-			}
-			catch (Exception e) {
-				// ignore
-			}
-		}
-		if (tmpZkDir != null && tmpZkDir.exists()) {
-			try {
-				FileUtils.deleteDirectory(tmpZkDir);
-			}
-			catch (Exception e) {
-				// ignore
-			}
-		}
-	}
-	@Override
-	public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
-		// create topic with one client
-"Creating topic {}", topic);
-		ZkClient creator = createZkClient();
-		AdminUtils.createTopic(creator, topic, numberOfPartitions, replicationFactor, topicConfig);
-		creator.close();
-		// validate that the topic has been created
-		final long deadline = System.currentTimeMillis() + 30000;
-		do {
-			try {
-				Thread.sleep(100);
-			}
-			catch (InterruptedException e) {
-				// restore interrupted state
-			}
-			List<KafkaTopicPartitionLeader> partitions = FlinkKafkaConsumer08.getPartitionsForTopic(Collections.singletonList(topic), standardProps);
-			if (partitions != null && partitions.size() > 0) {
-				return;
-			}
-		}
-		while (System.currentTimeMillis() < deadline);
-		fail ("Test topic could not be created");
-	}
-	@Override
-	public void deleteTestTopic(String topic) {
-"Deleting topic {}", topic);
-		ZkClient zk = createZkClient();
-		AdminUtils.deleteTopic(zk, topic);
-		zk.close();
-	}
-	private ZkClient createZkClient() {
-		return new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("")),
-				Integer.valueOf(standardProps.getProperty("")), new ZooKeeperStringSerializer());
-	}
-	/**
-	 * Only for the 0.8 server we need access to the zk client.
-	 */
-	public CuratorFramework createCuratorClient() {
-		RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
-		CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy);
-		curatorClient.start();
-		return curatorClient;
-	}
-	/**
-	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
-	 */
-	protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
-"Starting broker with id {}", brokerId);
-		Properties kafkaProperties = new Properties();
-		// properties have to be Strings
-		kafkaProperties.put("", KAFKA_HOST);
-		kafkaProperties.put("", Integer.toString(brokerId));
-		kafkaProperties.put("log.dir", tmpFolder.toString());
-		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
-		kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
-		kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
-		// for CI stability, increase zookeeper session timeout
-		kafkaProperties.put("", "30000");
-		kafkaProperties.put("", "30000");
-		if(additionalServerProperties != null) {
-			kafkaProperties.putAll(additionalServerProperties);
-		}
-		final int numTries = 5;
-		for (int i = 1; i <= numTries; i++) {
-			int kafkaPort = NetUtils.getAvailablePort();
-			kafkaProperties.put("port", Integer.toString(kafkaPort));
-			KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-			try {
-				KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
-				server.startup();
-				return server;
-			}
-			catch (KafkaException e) {
-				if (e.getCause() instanceof BindException) {
-					// port conflict, retry...
-"Port conflict when starting Kafka Broker. Retrying...");
-				}
-				else {
-					throw e;
-				}
-			}
-		}
-		throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
-	}
-	private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
-		private final CuratorFramework offsetClient;
-		private final String groupId;
-		public KafkaOffsetHandlerImpl(Properties props) {
-			offsetClient = createCuratorClient();
-			groupId = props.getProperty("");
-		}
-		@Override
-		public Long getCommittedOffset(String topicName, int partition) {
-			try {
-				return ZookeeperOffsetHandler.getOffsetFromZooKeeper(offsetClient, groupId, topicName, partition);
-			} catch (Exception e) {
-				throw new RuntimeException("Exception when getting offsets from Zookeeper", e);
-			}
-		}
-		@Override
-		public void close() {
-			offsetClient.close();
-		}
-	}
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/
-import org.junit.Test;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicReference;
-import static org.junit.Assert.*;
-import static java.util.Arrays.asList;
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-public class ClosableBlockingQueueTest {
-	// ------------------------------------------------------------------------
-	//  single-threaded unit tests
-	// ------------------------------------------------------------------------
-	@Test
-	public void testCreateQueueHashCodeEquals() {
-		try {
-			ClosableBlockingQueue<String> queue1 = new ClosableBlockingQueue<>();
-			ClosableBlockingQueue<String> queue2 = new ClosableBlockingQueue<>(22);
-			assertTrue(queue1.isOpen());
-			assertTrue(queue2.isOpen());
-			assertTrue(queue1.isEmpty());
-			assertTrue(queue2.isEmpty());
-			assertEquals(0, queue1.size());
-			assertEquals(0, queue2.size());
-			assertTrue(queue1.hashCode() == queue2.hashCode());
-			//noinspection EqualsWithItself
-			assertTrue(queue1.equals(queue1));
-			//noinspection EqualsWithItself
-			assertTrue(queue2.equals(queue2));
-			assertTrue(queue1.equals(queue2));
-			assertNotNull(queue1.toString());
-			assertNotNull(queue2.toString());
-			List<String> elements = new ArrayList<>();
-			elements.add("a");
-			elements.add("b");
-			elements.add("c");
-			ClosableBlockingQueue<String> queue3 = new ClosableBlockingQueue<>(elements);
-			ClosableBlockingQueue<String> queue4 = new ClosableBlockingQueue<>(asList("a", "b", "c"));
-			assertTrue(queue3.isOpen());
-			assertTrue(queue4.isOpen());
-			assertFalse(queue3.isEmpty());
-			assertFalse(queue4.isEmpty());
-			assertEquals(3, queue3.size());
-			assertEquals(3, queue4.size());
-			assertTrue(queue3.hashCode() == queue4.hashCode());
-			//noinspection EqualsWithItself
-			assertTrue(queue3.equals(queue3));
-			//noinspection EqualsWithItself
-			assertTrue(queue4.equals(queue4));
-			assertTrue(queue3.equals(queue4));
-			assertNotNull(queue3.toString());
-			assertNotNull(queue4.toString());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	@Test
-	public void testCloseEmptyQueue() {
-		try {
-			ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
-			assertTrue(queue.isOpen());
-			assertTrue(queue.close());
-			assertFalse(queue.isOpen());
-			assertFalse(queue.addIfOpen("element"));
-			assertTrue(queue.isEmpty());
-			try {
-				queue.add("some element");
-				fail("should cause an exception");
-			} catch (IllegalStateException ignored) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	@Test
-	public void testCloseNonEmptyQueue() {
-		try {
-			ClosableBlockingQueue<Integer> queue = new ClosableBlockingQueue<>(asList(1, 2, 3));
-			assertTrue(queue.isOpen());
-			assertFalse(queue.close());
-			assertFalse(queue.close());
-			queue.poll();
-			assertFalse(queue.close());
-			assertFalse(queue.close());
-			queue.pollBatch();
-			assertTrue(queue.close());
-			assertFalse(queue.isOpen());
-			assertFalse(queue.addIfOpen(42));
-			assertTrue(queue.isEmpty());
-			try {
-				queue.add(99);
-				fail("should cause an exception");
-			} catch (IllegalStateException ignored) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	@Test
-	public void testPeekAndPoll() {
-		try {
-			ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
-			assertNull(queue.peek());
-			assertNull(queue.peek());
-			assertNull(queue.poll());
-			assertNull(queue.poll());
-			assertEquals(0, queue.size());
-			queue.add("a");
-			queue.add("b");
-			queue.add("c");
-			assertEquals(3, queue.size());
-			assertEquals("a", queue.peek());
-			assertEquals("a", queue.peek());
-			assertEquals("a", queue.peek());
-			assertEquals(3, queue.size());
-			assertEquals("a", queue.poll());
-			assertEquals("b", queue.poll());
-			assertEquals(1, queue.size());
-			assertEquals("c", queue.peek());
-			assertEquals("c", queue.peek());
-			assertEquals("c", queue.poll());
-			assertEquals(0, queue.size());
-			assertNull(queue.poll());
-			assertNull(queue.peek());
-			assertNull(queue.peek());
-			assertTrue(queue.close());
-			try {
-				queue.peek();
-				fail("should cause an exception");
-			} catch (IllegalStateException ignored) {
-				// expected
-			}
-			try {
-				queue.poll();
-				fail("should cause an exception");
-			} catch (IllegalStateException ignored) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	@Test
-	public void testPollBatch() {
-		try {
-			ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
-			assertNull(queue.pollBatch());
-			queue.add("a");
-			queue.add("b");
-			assertEquals(asList("a", "b"), queue.pollBatch());
-			assertNull(queue.pollBatch());
-			queue.add("c");
-			assertEquals(singletonList("c"), queue.pollBatch());
-			assertNull(queue.pollBatch());
-			assertTrue(queue.close());
-			try {
-				queue.pollBatch();
-				fail("should cause an exception");
-			} catch (IllegalStateException ignored) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	@Test
-	public void testGetElementBlocking() {
-		try {
-			ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
-			assertNull(queue.getElementBlocking(1));
-			assertNull(queue.getElementBlocking(3));
-			assertNull(queue.getElementBlocking(2));
-			assertEquals(0, queue.size());
-			queue.add("a");
-			queue.add("b");
-			queue.add("c");
-			queue.add("d");
-			queue.add("e");
-			queue.add("f");
-			assertEquals(6, queue.size());
-			assertEquals("a", queue.getElementBlocking(99));
-			assertEquals("b", queue.getElementBlocking());
-			assertEquals(4, queue.size());
-			assertEquals("c", queue.getElementBlocking(0));
-			assertEquals("d", queue.getElementBlocking(1000000));
-			assertEquals("e", queue.getElementBlocking());
-			assertEquals("f", queue.getElementBlocking(1786598));
-			assertEquals(0, queue.size());
-			assertNull(queue.getElementBlocking(1));
-			assertNull(queue.getElementBlocking(3));
-			assertNull(queue.getElementBlocking(2));
-			assertTrue(queue.close());
-			try {
-				queue.getElementBlocking();
-				fail("should cause an exception");
-			} catch (IllegalStateException ignored) {
-				// expected
-			}
-			try {
-				queue.getElementBlocking(1000000000L);
-				fail("should cause an exception");
-			} catch (IllegalStateException ignored) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	@Test
-	public void testGetBatchBlocking() {
-		try {
-			ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
-			assertEquals(emptyList(), queue.getBatchBlocking(1));
-			assertEquals(emptyList(), queue.getBatchBlocking(3));
-			assertEquals(emptyList(), queue.getBatchBlocking(2));
-			queue.add("a");
-			queue.add("b");
-			assertEquals(asList("a", "b"), queue.getBatchBlocking(900000009));
-			queue.add("c");
-			queue.add("d");
-			assertEquals(asList("c", "d"), queue.getBatchBlocking());
-			assertEquals(emptyList(), queue.getBatchBlocking(2));
-			queue.add("e");
-			assertEquals(singletonList("e"), queue.getBatchBlocking(0));
-			queue.add("f");
-			assertEquals(singletonList("f"), queue.getBatchBlocking(1000000000));
-			assertEquals(0, queue.size());
-			assertEquals(emptyList(), queue.getBatchBlocking(1));
-			assertEquals(emptyList(), queue.getBatchBlocking(3));
-			assertEquals(emptyList(), queue.getBatchBlocking(2));
-			assertTrue(queue.close());
-			try {
-				queue.getBatchBlocking();
-				fail("should cause an exception");
-			} catch (IllegalStateException ignored) {
-				// expected
-			}
-			try {
-				queue.getBatchBlocking(1000000000L);
-				fail("should cause an exception");
-			} catch (IllegalStateException ignored) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	// ------------------------------------------------------------------------
-	//  multi-threaded tests
-	// ------------------------------------------------------------------------
-	@Test
-	public void notifyOnClose() {
-		try {
-			final long oneYear = 365L * 24 * 60 * 60 * 1000;
-			// test "getBatchBlocking()"
-			final ClosableBlockingQueue<String> queue1 = new ClosableBlockingQueue<>();
-			QueueCall call1 = new QueueCall() {
-				@Override
-				public void call() throws Exception {
-					queue1.getBatchBlocking();
-				}
-			};
-			testCallExitsOnClose(call1, queue1);
-			// test "getBatchBlocking()"
-			final ClosableBlockingQueue<String> queue2 = new ClosableBlockingQueue<>();
-			QueueCall call2 = new QueueCall() {
-				@Override
-				public void call() throws Exception {
-					queue2.getBatchBlocking(oneYear);
-				}
-			};
-			testCallExitsOnClose(call2, queue2);
-			// test "getBatchBlocking()"
-			final ClosableBlockingQueue<String> queue3 = new ClosableBlockingQueue<>();
-			QueueCall call3 = new QueueCall() {
-				@Override
-				public void call() throws Exception {
-					queue3.getElementBlocking();
-				}
-			};
-			testCallExitsOnClose(call3, queue3);
-			// test "getBatchBlocking()"
-			final ClosableBlockingQueue<String> queue4 = new ClosableBlockingQueue<>();
-			QueueCall call4 = new QueueCall() {
-				@Override
-				public void call() throws Exception {
-					queue4.getElementBlocking(oneYear);
-				}
-			};
-			testCallExitsOnClose(call4, queue4);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-	@Test
-	public void testMultiThreadedAddGet() {
-		try {
-			final ClosableBlockingQueue<Integer> queue = new ClosableBlockingQueue<>();
-			final AtomicReference<Throwable> pushErrorRef = new AtomicReference<>();
-			final AtomicReference<Throwable> pollErrorRef = new AtomicReference<>();
-			final int numElements = 2000;
-			Thread pusher = new Thread("pusher") {
-				@Override
-				public void run() {
-					try {
-						final Random rnd = new Random();
-						for (int i = 0; i < numElements; i++) {
-							queue.add(i);
-							// sleep a bit, sometimes
-							int sleepTime = rnd.nextInt(3);
-							if (sleepTime > 1) {
-								Thread.sleep(sleepTime);
-							}
-						}
-						while (true) {
-							if (queue.close()) {
-								break;
-							} else {
-								Thread.sleep(5);
-							}
-						}
-					} catch (Throwable t) {
-						pushErrorRef.set(t);
-					}
-				}
-			};
-			pusher.start();
-			Thread poller = new Thread("poller") {
-				@SuppressWarnings("InfiniteLoopStatement")
-				@Override
-				public void run() {
-					try {
-						int count = 0;
-						try {
-							final Random rnd = new Random();
-							int nextExpected = 0;
-							while (true) {
-								int getMethod = count % 7;
-								switch (getMethod) {
-									case 0: {
-										Integer next = queue.getElementBlocking(1);
-										if (next != null) {
-											assertEquals(nextExpected, next.intValue());
-											nextExpected++;
-											count++;
-										}
-										break;
-									}
-									case 1: {
-										List<Integer> nextList = queue.getBatchBlocking();
-										for (Integer next : nextList) {
-											assertNotNull(next);
-											assertEquals(nextExpected, next.intValue());
-											nextExpected++;
-											count++;
-										}
-										break;
-									}
-									case 2: {
-										List<Integer> nextList = queue.getBatchBlocking(1);
-										if (nextList != null) {
-											for (Integer next : nextList) {
-												assertNotNull(next);
-												assertEquals(nextExpected, next.intValue());
-												nextExpected++;
-												count++;
-											}
-										}
-										break;
-									}
-									case 3: {
-										Integer next = queue.poll();
-										if (next != null) {
-											assertEquals(nextExpected, next.intValue());
-											nextExpected++;
-											count++;
-										}
-										break;
-									}
-									case 4: {
-										List<Integer> nextList = queue.pollBatch();
-										if (nextList != null) {
-											for (Integer next : nextList) {
-												assertNotNull(next);
-												assertEquals(nextExpected, next.intValue());
-												nextExpected++;
-												count++;
-											}
-										}
-										break;
-									}
-									default: {
-										Integer next = queue.getElementBlocking();
-										assertNotNull(next);
-										assertEquals(nextExpected, next.intValue());
-										nextExpected++;
-										count++;
-									}
-								}
-								// sleep a bit, sometimes
-								int sleepTime = rnd.nextInt(3);
-								if (sleepTime > 1) {
-									Thread.sleep(sleepTime);
-								}
-							}
-						} catch (IllegalStateException e) {
-							// we get this once the queue is closed
-							assertEquals(numElements, count);
-						}
-					} catch (Throwable t) {
-						pollErrorRef.set(t);
-					}
-				}
-			};
-			poller.start();
-			pusher.join();
-			poller.join();
-			if (pushErrorRef.get() != null) {
-				Throwable t = pushErrorRef.get();
-				t.printStackTrace();
-				fail("Error in pusher: " + t.getMessage());
-			}
-			if (pollErrorRef.get() != null) {
-				Throwable t = pollErrorRef.get();
-				t.printStackTrace();
-				fail("Error in poller: " + t.getMessage());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	// ------------------------------------------------------------------------
-	//  Utils
-	// ------------------------------------------------------------------------
-	private static void testCallExitsOnClose(
-			final QueueCall call, ClosableBlockingQueue<String> queue) throws Exception {
-		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-		Runnable runnable = new Runnable() {
-			@Override
-			public void run() {
-				try {
-				} catch (Throwable t) {
-					errorRef.set(t);
-				}
-			}
-		};
-		Thread thread = new Thread(runnable);
-		thread.start();
-		Thread.sleep(100);
-		queue.close();
-		thread.join();
-		@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-		Throwable cause = errorRef.get();
-		assertTrue(cause instanceof IllegalStateException);
-	}
-	private interface QueueCall {
-		void call() throws Exception;
-	}
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/
-log4j.rootLogger=INFO, testlogger
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender = System.err
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-# suppress the irrelevant (wrong) warnings from the netty channel handler, testlogger, testlogger
-log4j.logger.state.change.logger=OFF, testlogger
-log4j.logger.kafka=OFF, testlogger
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.streaming" level="WARN"/>
\ No newline at end of file
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
-<project xmlns=""
-		 xmlns:xsi=""
-		 xsi:schemaLocation="">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-connectors</artifactId>
-		<version>1.2-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-	<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
-	<name>flink-connector-kafka-0.9</name>
-	<packaging>jar</packaging>
-	<!-- Allow users to pass custom connector versions -->
-	<properties>
-		<kafka.version></kafka.version>
-	</properties>
-	<dependencies>
-		<!-- core dependencies -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-base_2.10</artifactId>
-			<version>${project.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.apache.kafka</groupId>
-					<artifactId>kafka_${scala.binary.version}</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-			<!-- Projects depending on this project,
-			won't depend on flink-table. -->
-			<optional>true</optional>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.kafka</groupId>
-			<artifactId>kafka-clients</artifactId>
-			<version>${kafka.version}</version>
-		</dependency>
-		<!-- test dependencies -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-base_2.10</artifactId>
-			<version>${project.version}</version>
-			<exclusions>
-				<!-- exclude 0.8 dependencies -->
-				<exclusion>
-					<groupId>org.apache.kafka</groupId>
-					<artifactId>kafka_${scala.binary.version}</artifactId>
-				</exclusion>
-			</exclusions>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<!-- include 0.9 server for tests  -->
-			<groupId>org.apache.kafka</groupId>
-			<artifactId>kafka_${scala.binary.version}</artifactId>
-			<version>${kafka.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-metrics-jmx</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-minikdc</artifactId>
-			<version>${minikdc.version}</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-						<configuration>
-							<includes>
-								<include>**/KafkaTestEnvironmentImpl*</include>
-							</includes>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-source-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>attach-test-sources</id>
-						<goals>
-							<goal>test-jar-no-fork</goal>
-						</goals>
-						<configuration>
-							<includes>
-								<include>**/KafkaTestEnvironmentImpl*</include>
-							</includes>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<configuration>
-					<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
-					<forkCount>1</forkCount>
-					<argLine>-Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
-				</configuration>
-			</plugin>
-			<!--
-            Required to pull the Mini-KDC transitive dependency
-            -->
-			<plugin>
-				<groupId>org.apache.felix</groupId>
-				<artifactId>maven-bundle-plugin</artifactId>
-				<version>3.0.1</version>
-				<inherited>true</inherited>
-				<extensions>true</extensions>
-			</plugin>
-		</plugins>
-	</build>
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.util.SerializedValue;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import static org.apache.flink.util.Preconditions.checkNotNull;
- * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
- * Apache Kafka 0.9.x. The consumer can run in multiple parallel instances, each of which will pull
- * data from one or more Kafka partitions. 
- * 
- * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
- * during a failure, and that the computation processes elements "exactly once". 
- * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
- *
- * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
- * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view
- * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer
- * has consumed a topic.</p>
- *
- * <p>Please refer to Kafka's documentation for the available configuration properties:
- *</p>
- *
- * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
- * is constructed. That means that the client that submits the program needs to be able to
- * reach the Kafka brokers or ZooKeeper.</p>
- */
-public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
-	private static final long serialVersionUID = 2324564345203409112L;
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer09.class);
-	/**  Configuration key to change the polling timeout **/
-	public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
-	/** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
-	 * available. If 0, returns immediately with any records that are available now. */
-	public static final long DEFAULT_POLL_TIMEOUT = 100L;
-	// ------------------------------------------------------------------------
-	/** User-supplied properties for Kafka **/
-	protected final Properties properties;
-	/** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
-	 * available. If 0, returns immediately with any records that are available now */
-	protected final long pollTimeout;
-	// ------------------------------------------------------------------------
-	/**
-	 * Creates a new Kafka streaming source consumer for Kafka 0.9.x
-	 *
-	 * @param topic
-	 *           The name of the topic that should be consumed.
-	 * @param valueDeserializer
-	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
-	 * @param props
-	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
-	 */
-	public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
-		this(Collections.singletonList(topic), valueDeserializer, props);
-	}
-	/**
-	 * Creates a new Kafka streaming source consumer for Kafka 0.9.x
-	 *
-	 * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
-	 * pairs, offsets, and topic names from Kafka.
-	 *
-	 * @param topic
-	 *           The name of the topic that should be consumed.
-	 * @param deserializer
-	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
-	 * @param props
-	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
-	 */
-	public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
-		this(Collections.singletonList(topic), deserializer, props);
-	}
-	/**
-	 * Creates a new Kafka streaming source consumer for Kafka 0.9.x
-	 *
-	 * This constructor allows passing multiple topics to the consumer.
-	 *
-	 * @param topics
-	 *           The Kafka topics to read from.
-	 * @param deserializer
-	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
-	 * @param props
-	 *           The properties that are used to configure both the fetcher and the offset handler.
-	 */
-	public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
-		this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props);
-	}
-	/**
-	 * Creates a new Kafka streaming source consumer for Kafka 0.9.x
-	 *
-	 * This constructor allows passing multiple topics and a key/value deserialization schema.
-	 *
-	 * @param topics
-	 *           The Kafka topics to read from.
-	 * @param deserializer
-	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
-	 * @param props
-	 *           The properties that are used to configure both the fetcher and the offset handler.
-	 */
-	public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
-		super(topics, deserializer);
- = checkNotNull(props, "props");
-		setDeserializer(;
-		// configure the polling timeout
-		try {
-			if (properties.containsKey(KEY_POLL_TIMEOUT)) {
-				this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
-			} else {
-				this.pollTimeout = DEFAULT_POLL_TIMEOUT;
-			}
-		}
-		catch (Exception e) {
-			throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
-		}
-	}
-	@Override
-	protected AbstractFetcher<T, ?> createFetcher(
-			SourceContext<T> sourceContext,
-			List<KafkaTopicPartition> thisSubtaskPartitions,
-			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
-			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			StreamingRuntimeContext runtimeContext) throws Exception {
-		boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
-		return new Kafka09Fetcher<>(
-				sourceContext,
-				thisSubtaskPartitions,
-				watermarksPeriodic,
-				watermarksPunctuated,
-				runtimeContext.getProcessingTimeService(),
-				runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
-				runtimeContext.getUserCodeClassLoader(),
-				runtimeContext.isCheckpointingEnabled(),
-				runtimeContext.getTaskNameWithSubtasks(),
-				runtimeContext.getMetricGroup(),
-				deserializer,
-				properties,
-				pollTimeout,
-				useMetrics);
-	}
-	@Override
-	protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
-		// read the partitions that belong to the listed topics
-		final List<KafkaTopicPartition> partitions = new ArrayList<>();
-		try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>( {
-			for (final String topic: topics) {
-				// get partitions for each topic
-				List<PartitionInfo> partitionsForTopic = consumer.partitionsFor(topic);
-				// for non existing topics, the list might be null.
-				if (partitionsForTopic != null) {
-					partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
-				}
-			}
-		}
-		if (partitions.isEmpty()) {
-			throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics);
-		}
-		// we now have a list of partitions which is the same for all parallel consumer instances.
-"Got {} partitions from these topics: {}", partitions.size(), topics);
-		if (LOG.isInfoEnabled()) {
-			logPartitionInfo(LOG, partitions);
-		}
-		return partitions;
-	}
-	// ------------------------------------------------------------------------
-	//  Utilities 
-	// ------------------------------------------------------------------------
-	/**
-	 * Converts a list of Kafka PartitionInfo's to Flink's KafkaTopicPartition (which are serializable)
-	 * 
-	 * @param partitions A list of Kafka PartitionInfos.
-	 * @return A list of KafkaTopicPartitions
-	 */
-	private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
-		checkNotNull(partitions);
-		List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size());
-		for (PartitionInfo pi : partitions) {
-			ret.add(new KafkaTopicPartition(pi.topic(), pi.partition()));
-		}
-		return ret;
-	}
-	/**
-	 * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties.
-	 * 
-	 * @param props The Kafka properties to register the serializer in.
-	 */
-	private static void setDeserializer(Properties props) {
-		final String deSerName = ByteArrayDeserializer.class.getCanonicalName();
-		Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-		Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-		if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
-			LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-		}
-		if (valDeSer != null && !valDeSer.equals(deSerName)) {
-			LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-		}
-		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
-		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
-	}
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import java.util.Properties;
- * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.9.
- *
- * Please note that this producer does not have any reliability guarantees.
- *
- * @param <IN> Type of the messages to write into Kafka.
- */
-public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
-	private static final long serialVersionUID = 1L;
-	// ------------------- Keyless serialization schema constructors ----------------------
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
-	 *
-	 * @param brokerList
-	 *			Comma separated addresses of the brokers
-	 * @param topicId
-	 * 			ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
-	 */
-	public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
-	}
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
-	 *
-	 * @param topicId
-	 * 			ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
-	 * @param producerConfig
-	 * 			Properties with the producer configuration.
-	 */
-	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());
-	}
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
-	 *
-	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
-	 */
-	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
-	}
-	// ------------------- Key/Value serialization schema constructors ----------------------
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
-	 *
-	 * @param brokerList
-	 *			Comma separated addresses of the brokers
-	 * @param topicId
-	 * 			ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 			User defined serialization schema supporting key/value messages
-	 */
-	public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
-		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
-	}
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
-	 *
-	 * @param topicId
-	 * 			ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 			User defined serialization schema supporting key/value messages
-	 * @param producerConfig
-	 * 			Properties with the producer configuration.
-	 */
-	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>());
-	}
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
-	 *
-	 * @param topicId The topic to write data to
-	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
-	 */
-	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
-		super(topicId, serializationSchema, producerConfig, customPartitioner);
-	}
-	@Override
-	protected void flush() {
-		if (this.producer != null) {
-			producer.flush();
-		}
-	}
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import java.util.Properties;
- * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
- */
-public class Kafka09JsonTableSink extends KafkaJsonTableSink {
-	/**
-	 * Creates {@link KafkaTableSink} for Kafka 0.9
-	 *
-	 * @param topic topic in Kafka to which table is written
-	 * @param properties properties to connect to Kafka
-	 * @param partitioner Kafka partitioner
-	 */
-	public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
-		super(topic, properties, partitioner);
-	}
-	@Override
-	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
-		return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
-	}
-	@Override
-	protected Kafka09JsonTableSink createCopy() {
-		return new Kafka09JsonTableSink(topic, properties, partitioner);
-	}
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import java.util.Properties;
- * Kafka {@link StreamTableSource} for Kafka 0.9.
- */
-public class Kafka09JsonTableSource extends KafkaJsonTableSource {
-	/**
-	 * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
-	 *
-	 * @param topic      Kafka topic to consume.
-	 * @param properties Properties for the Kafka consumer.
-	 * @param fieldNames Row field names.
-	 * @param fieldTypes Row field types.
-	 */
-	public Kafka09JsonTableSource(
-			String topic,
-			Properties properties,
-			String[] fieldNames,
-			TypeInformation<?>[] fieldTypes) {
-		super(topic, properties, fieldNames, fieldTypes);
-	}
-	/**
-	 * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
-	 *
-	 * @param topic      Kafka topic to consume.
-	 * @param properties Properties for the Kafka consumer.
-	 * @param fieldNames Row field names.
-	 * @param fieldTypes Row field types.
-	 */
-	public Kafka09JsonTableSource(
-			String topic,
-			Properties properties,
-			String[] fieldNames,
-			Class<?>[] fieldTypes) {
-		super(topic, properties, fieldNames, fieldTypes);
-	}
-	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
-		return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
-	}
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import java.util.Properties;
- * Kafka {@link StreamTableSource} for Kafka 0.9.
- */
-public class Kafka09TableSource extends KafkaTableSource {
-	/**
-	 * Creates a Kafka 0.9 {@link StreamTableSource}.
-	 *
-	 * @param topic                 Kafka topic to consume.
-	 * @param properties            Properties for the Kafka consumer.
-	 * @param deserializationSchema Deserialization schema to use for Kafka records.
-	 * @param fieldNames            Row field names.
-	 * @param fieldTypes            Row field types.
-	 */
-	public Kafka09TableSource(
-			String topic,
-			Properties properties,
-			DeserializationSchema<Row> deserializationSchema,
-			String[] fieldNames,
-			TypeInformation<?>[] fieldTypes) {
-		super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
-	}
-	/**
-	 * Creates a Kafka 0.9 {@link StreamTableSource}.
-	 *
-	 * @param topic                 Kafka topic to consume.
-	 * @param properties            Properties for the Kafka consumer.
-	 * @param deserializationSchema Deserialization schema to use for Kafka records.
-	 * @param fieldNames            Row field names.
-	 * @param fieldTypes            Row field types.
-	 */
-	public Kafka09TableSource(
-			String topic,
-			Properties properties,
-			DeserializationSchema<Row> deserializationSchema,
-			String[] fieldNames,
-			Class<?>[] fieldTypes) {
-		super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
-	}
-	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
-		return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
-	}
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.ThreadSafe;
-import static org.apache.flink.util.Preconditions.checkNotNull;
- * The Handover is a utility to hand over data (a buffer of records) and exception from a
- * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves like a
- * "size one blocking queue", with some extras around exception reporting, closing, and
- * waking up thread without {@link Thread#interrupt() interrupting} threads.
- * 
- * <p>This class is used in the Flink Kafka Consumer to hand over data and exceptions between
- * the thread that runs the KafkaConsumer class and the main thread.
- * 
- * <p>The Handover has the notion of "waking up" the producer thread with a {@link WakeupException}
- * rather than a thread interrupt.
- * 
- * <p>The Handover can also be "closed", signalling from one thread to the other that it
- * the thread has terminated.
- */
-public final class Handover implements Closeable {
-	private final Object lock = new Object();
-	private ConsumerRecords<byte[], byte[]> next;
-	private Throwable error;
-	private boolean wakeupProducer;
-	/**
-	 * Polls the next element from the Handover, possibly blocking until the next element is
-	 * available. This method behaves similar to polling from a blocking queue.
-	 * 
-	 * <p>If an exception was handed in by the producer ({@link #reportError(Throwable)}), then
-	 * that exception is thrown rather than an element being returned.
-	 * 
-	 * @return The next element (buffer of records, never null).
-	 * 
-	 * @throws ClosedException Thrown if the Handover was {@link #close() closed}.
-	 * @throws Exception Rethrows exceptions from the {@link #reportError(Throwable)} method.
-	 */
-	@Nonnull
-	public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
-		synchronized (lock) {
-			while (next == null && error == null) {
-				lock.wait();
-			}
-			ConsumerRecords<byte[], byte[]> n = next;
-			if (n != null) {
-				next = null;
-				lock.notifyAll();
-				return n;
-			}
-			else {
-				ExceptionUtils.rethrowException(error, error.getMessage());
-				// this statement cannot be reached since the above method always throws an exception
-				// this is only here to silence the compiler and any warnings
-				return ConsumerRecords.empty(); 
-			}
-		}
-	}
-	/**
-	 * Hands over an element from the producer. If the Handover already has an element that was
-	 * not yet picked up by the consumer thread, this call blocks until the consumer picks up that
-	 * previous element.
-	 * 
-	 * <p>This behavior is similar to a "size one" blocking queue.
-	 * 
-	 * @param element The next element to hand over.
-	 * 
-	 * @throws InterruptedException
-	 *                 Thrown, if the thread is interrupted while blocking for the Handover to be empty.
-	 * @throws WakeupException
-	 *                 Thrown, if the {@link #wakeupProducer()} method is called while blocking for
-	 *                 the Handover to be empty.
-	 * @throws ClosedException
-	 *                 Thrown if the Handover was closed or concurrently being closed.
-	 */
-	public void produce(final ConsumerRecords<byte[], byte[]> element)
-			throws InterruptedException, WakeupException, ClosedException {
-		checkNotNull(element);
-		synchronized (lock) {
-			while (next != null && !wakeupProducer) {
-				lock.wait();
-			}
-			wakeupProducer = false;
-			// if there is still an element, we must have been woken up
-			if (next != null) {
-				throw new WakeupException();
-			}
-			// if there is no error, then this is open and can accept this element
-			else if (error == null) {
-				next = element;
-				lock.notifyAll();
-			}
-			// an error marks this as closed for the producer
-			else {
-				throw new ClosedException();
-			}
-		}
-	}
-	/**
-	 * Reports an exception. The consumer will throw the given exception immediately, if
-	 * it is currently blocked in the {@link #pollNext()} method, or the next time it
-	 * calls that method.
-	 * 
-	 * <p>After this method has been called, no call to either {@link #produce(ConsumerRecords)}
-	 * or {@link #pollNext()} will ever return regularly any more, but will always return
-	 * exceptionally.
-	 * 
-	 * <p>If another exception was already reported, this method does nothing.
-	 * 
-	 * <p>For the producer, the Handover will appear as if it was {@link #close() closed}.
-	 * 
-	 * @param t The exception to report.
-	 */
-	public void reportError(Throwable t) {
-		checkNotNull(t);
-		synchronized (lock) {
-			// do not override the initial exception
-			if (error == null) {
-				error = t;
-			}
-			next = null;
-			lock.notifyAll();
-		}
-	}
-	/**
-	 * Closes the handover. Both the {@link #produce(ConsumerRecords)} method and the
-	 * {@link #pollNext()} will throw a {@link ClosedException} on any currently blocking and
-	 * future invocations.
-	 * 
-	 * <p>If an exception was previously reported via the {@link #reportError(Throwable)} method,
-	 * that exception will not be overridden. The consumer thread will throw that exception upon
-	 * calling {@link #pollNext()}, rather than the {@code ClosedException}.
-	 */
-	@Override
-	public void close() {
-		synchronized (lock) {
-			next = null;
-			wakeupProducer = false;
-			if (error == null) {
-				error = new ClosedException();
-			}
-			lock.notifyAll();
-		}
-	}
-	/**
-	 * Wakes the producer thread up. If the producer thread is currently blocked in
-	 * the {@link #produce(ConsumerRecords)} method, it will exit the method throwing
-	 * a {@link WakeupException}.
-	 */
-	public void wakeupProducer() {
-		synchronized (lock) {
-			wakeupProducer = true;
-			lock.notifyAll();
-		}
-	}
-	// ------------------------------------------------------------------------
-	/**
-	 * An exception thrown by the Handover in the {@link #pollNext()} or
-	 * {@link #produce(ConsumerRecords)} method, after the Handover was closed via
-	 * {@link #close()}.
-	 */
-	public static final class ClosedException extends Exception {
-		private static final long serialVersionUID = 1L;
-	}
-	/**
-	 * A special exception thrown bv the Handover in the {@link #produce(ConsumerRecords)}
-	 * method when the producer is woken up from a blocking call via {@link #wakeupProducer()}.
-	 */
-	public static final class WakeupException extends Exception {
-		private static final long serialVersionUID = 1L;
-	}