You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:27 UTC

[37/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
new file mode 100644
index 0000000..42b9682
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import org.junit.Test;
+
+
+@SuppressWarnings("serial")
+public class Kafka010ProducerITCase extends KafkaProducerTestBase {
+
+	@Test
+	public void testCustomPartitioning() {
+		runCustomPartitioningTest();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
new file mode 100644
index 0000000..f15fd45
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.common.KafkaException;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.SystemTime$;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * An implementation of the KafkaServerProvider for Kafka 0.10
+ */
+public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+	private File tmpZkDir;
+	private File tmpKafkaParent;
+	private List<File> tmpKafkaDirs;
+	private List<KafkaServer> brokers;
+	private TestingServer zookeeper;
+	private String zookeeperConnectionString;
+	private String brokerConnectionString = "";
+	private Properties standardProps;
+	private Properties additionalServerProperties;
+	private boolean secureMode = false;
+	// 6 seconds is default. Seems to be too small for travis. 30 seconds
+	private int zkTimeout = 30000;
+
+	public String getBrokerConnectionString() {
+		return brokerConnectionString;
+	}
+
+	@Override
+	public Properties getStandardProperties() {
+		return standardProps;
+	}
+
+	@Override
+	public Properties getSecureProperties() {
+		Properties prop = new Properties();
+		if(secureMode) {
+			prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
+			prop.put("security.protocol", "SASL_PLAINTEXT");
+			prop.put("sasl.kerberos.service.name", "kafka");
+
+			//add special timeout for Travis
+			prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
+			prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
+			prop.setProperty("metadata.fetch.timeout.ms","120000");
+		}
+		return prop;
+	}
+
+	@Override
+	public String getVersion() {
+		return "0.10";
+	}
+
+	@Override
+	public List<KafkaServer> getBrokers() {
+		return brokers;
+	}
+
+	@Override
+	public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
+		return new FlinkKafkaConsumer010<>(topics, readSchema, props);
+	}
+
+	@Override
+	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
+		prod.setFlushOnCheckpoint(true);
+		return new StreamSink<>(prod);
+	}
+
+
+	@Override
+	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
+		prod.setFlushOnCheckpoint(true);
+		return stream.addSink(prod);
+	}
+
+	@Override
+	public KafkaOffsetHandler createOffsetHandler(Properties props) {
+		return new KafkaOffsetHandlerImpl(props);
+	}
+
+	@Override
+	public void restartBroker(int leaderId) throws Exception {
+		brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
+	}
+
+	@Override
+	public int getLeaderToShutDown(String topic) throws Exception {
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			MetadataResponse.PartitionMetadata firstPart = null;
+			do {
+				if (firstPart != null) {
+					LOG.info("Unable to find leader. error code {}", firstPart.error().code());
+					// not the first try. Sleep a bit
+					Thread.sleep(150);
+				}
+
+				List<MetadataResponse.PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionMetadata();
+				firstPart = partitionMetadata.get(0);
+			}
+			while (firstPart.error().code() != 0);
+
+			return firstPart.leader().id();
+		} finally {
+			zkUtils.close();
+		}
+	}
+
+	@Override
+	public int getBrokerId(KafkaServer server) {
+		return server.config().brokerId();
+	}
+
+	@Override
+	public boolean isSecureRunSupported() {
+		return true;
+	}
+
+	@Override
+	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
+		//increase the timeout since in Travis ZK connection takes long time for secure connection.
+		if(secureMode) {
+			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
+			numKafkaServers = 1;
+			zkTimeout = zkTimeout * 15;
+		}
+
+		this.additionalServerProperties = additionalServerProperties;
+		this.secureMode = secureMode;
+		File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
+		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
+
+		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
+		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
+
+		tmpKafkaDirs = new ArrayList<>(numKafkaServers);
+		for (int i = 0; i < numKafkaServers; i++) {
+			File tmpDir = new File(tmpKafkaParent, "server-" + i);
+			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
+			tmpKafkaDirs.add(tmpDir);
+		}
+
+		zookeeper = null;
+		brokers = null;
+
+		try {
+			zookeeper = new TestingServer(-	1, tmpZkDir);
+			zookeeperConnectionString = zookeeper.getConnectString();
+			LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
+
+			LOG.info("Starting KafkaServer");
+			brokers = new ArrayList<>(numKafkaServers);
+
+			for (int i = 0; i < numKafkaServers; i++) {
+				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
+
+				if(secureMode) {
+					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+				} else {
+					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+				}
+			}
+
+			LOG.info("ZK and KafkaServer started.");
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			fail("Test setup failed: " + t.getMessage());
+		}
+
+		standardProps = new Properties();
+		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
+		standardProps.setProperty("group.id", "flink-tests");
+		standardProps.setProperty("enable.auto.commit", "false");
+		standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
+		standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
+		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.10 value)
+		standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+	}
+
+	@Override
+	public void shutdown() {
+		for (KafkaServer broker : brokers) {
+			if (broker != null) {
+				broker.shutdown();
+			}
+		}
+		brokers.clear();
+
+		if (zookeeper != null) {
+			try {
+				zookeeper.stop();
+			}
+			catch (Exception e) {
+				LOG.warn("ZK.stop() failed", e);
+			}
+			zookeeper = null;
+		}
+
+		// clean up the temp spaces
+
+		if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+			try {
+				FileUtils.deleteDirectory(tmpKafkaParent);
+			}
+			catch (Exception e) {
+				// ignore
+			}
+		}
+		if (tmpZkDir != null && tmpZkDir.exists()) {
+			try {
+				FileUtils.deleteDirectory(tmpZkDir);
+			}
+			catch (Exception e) {
+				// ignore
+			}
+		}
+	}
+
+	public ZkUtils getZkUtils() {
+		ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
+		return ZkUtils.apply(creator, false);
+	}
+
+	@Override
+	public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
+		// create topic with one client
+		LOG.info("Creating topic {}", topic);
+
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, kafka.admin.RackAwareMode.Enforced$.MODULE$);
+		} finally {
+			zkUtils.close();
+		}
+
+		// validate that the topic has been created
+		final long deadline = System.currentTimeMillis() + 30000;
+		do {
+			try {
+				if(secureMode) {
+					//increase wait time since in Travis ZK timeout occurs frequently
+					int wait = zkTimeout / 100;
+					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
+					Thread.sleep(wait);
+				} else {
+					Thread.sleep(100);
+				}
+			} catch (InterruptedException e) {
+				// restore interrupted state
+			}
+			// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
+			// not always correct.
+
+			// create a new ZK utils connection
+			ZkUtils checkZKConn = getZkUtils();
+			if(AdminUtils.topicExists(checkZKConn, topic)) {
+				checkZKConn.close();
+				return;
+			}
+			checkZKConn.close();
+		}
+		while (System.currentTimeMillis() < deadline);
+		fail("Test topic could not be created");
+	}
+
+	@Override
+	public void deleteTestTopic(String topic) {
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			LOG.info("Deleting topic {}", topic);
+
+			ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
+
+			AdminUtils.deleteTopic(zkUtils, topic);
+
+			zk.close();
+		} finally {
+			zkUtils.close();
+		}
+	}
+
+	/**
+	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+	 */
+	protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
+		Properties kafkaProperties = new Properties();
+
+		// properties have to be Strings
+		kafkaProperties.put("advertised.host.name", KAFKA_HOST);
+		kafkaProperties.put("broker.id", Integer.toString(brokerId));
+		kafkaProperties.put("log.dir", tmpFolder.toString());
+		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+		kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
+		kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
+
+		// for CI stability, increase zookeeper session timeout
+		kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+		kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
+		if(additionalServerProperties != null) {
+			kafkaProperties.putAll(additionalServerProperties);
+		}
+
+		final int numTries = 5;
+
+		for (int i = 1; i <= numTries; i++) {
+			int kafkaPort = NetUtils.getAvailablePort();
+			kafkaProperties.put("port", Integer.toString(kafkaPort));
+
+			//to support secure kafka cluster
+			if(secureMode) {
+				LOG.info("Adding Kafka secure configurations");
+				kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+				kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+				kafkaProperties.putAll(getSecureProperties());
+			}
+
+			KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+			try {
+				scala.Option<String> stringNone = scala.Option.apply(null);
+				KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
+				server.startup();
+				return server;
+			}
+			catch (KafkaException e) {
+				if (e.getCause() instanceof BindException) {
+					// port conflict, retry...
+					LOG.info("Port conflict when starting Kafka Broker. Retrying...");
+				}
+				else {
+					throw e;
+				}
+			}
+		}
+
+		throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
+	}
+
+	private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+		private final KafkaConsumer<byte[], byte[]> offsetClient;
+
+		public KafkaOffsetHandlerImpl(Properties props) {
+			offsetClient = new KafkaConsumer<>(props);
+		}
+
+		@Override
+		public Long getCommittedOffset(String topicName, int partition) {
+			OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition));
+			return (committed != null) ? committed.offset() : null;
+		}
+
+		@Override
+		public void close() {
+			offsetClient.close();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..fbeb110
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+log4j.logger.org.apache.zookeeper=OFF, testlogger
+log4j.logger.state.change.logger=OFF, testlogger
+log4j.logger.kafka=OFF, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml b/flink-connectors/flink-connector-kafka-0.8/pom.xml
new file mode 100644
index 0000000..d1fecb6
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml
@@ -0,0 +1,219 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.2-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
+	<name>flink-connector-kafka-0.8</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<kafka.version>0.8.2.2</kafka.version>
+	</properties>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-curator-recipes</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka-base_2.10</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project,
+			won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_${scala.binary.version}</artifactId>
+			<version>${kafka.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-simple</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>net.sf.jopt-simple</groupId>
+					<artifactId>jopt-simple</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.scala-lang</groupId>
+					<artifactId>scala-reflect</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.scala-lang</groupId>
+					<artifactId>scala-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.yammer.metrics</groupId>
+					<artifactId>metrics-annotation</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.xerial.snappy</groupId>
+					<artifactId>snappy-java</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-test</artifactId>
+			<version>${curator.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-jmx</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka-base_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
+					<forkCount>1</forkCount>
+				</configuration>
+			</plugin>
+			<!-- Relocate curator -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<includes combine.children="append">
+									<include>org.apache.flink:flink-shaded-curator-recipes</include>
+								</includes>
+							</artifactSet>
+							<relocations combine.children="append">
+								<relocation>
+									<pattern>org.apache.curator</pattern>
+									<shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+	
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
new file mode 100644
index 0000000..0aacccd
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.api.OffsetRequest;
+import kafka.cluster.Broker;
+import kafka.common.ErrorMapping;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import static org.apache.flink.util.PropertiesUtil.getInt;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
+ * Apache Kafka 0.8.x. The consumer can run in multiple parallel instances, each of which will pull
+ * data from one or more Kafka partitions. 
+ * 
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
+ * during a failure, and that the computation processes elements "exactly once". 
+ * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
+ * 
+ * <p>Flink's Kafka Consumer is designed to be compatible with Kafka's High-Level Consumer API (0.8.x).
+ * Most of Kafka's configuration variables can be used with this consumer as well:
+ *         <ul>
+ *             <li>socket.timeout.ms</li>
+ *             <li>socket.receive.buffer.bytes</li>
+ *             <li>fetch.message.max.bytes</li>
+ *             <li>auto.offset.reset with the values "largest", "smallest"</li>
+ *             <li>fetch.wait.max.ms</li>
+ *         </ul>
+ *     </li>
+ * </ul>
+ * 
+ * <h1>Offset handling</h1>
+ * 
+ * <p>Offsets whose records have been read and are checkpointed will be committed back to ZooKeeper
+ * by the offset handler. In addition, the offset handler finds the point where the source initially
+ * starts reading from the stream, when the streaming job is started.</p>
+ *
+ * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer
+ * has consumed a topic.</p>
+ *
+ * <p>If checkpointing is disabled, the consumer will periodically commit the current offset
+ * to Zookeeper.</p>
+ *
+ * <p>When using a Kafka topic to send data between Flink jobs, we recommend using the
+ * {@see TypeInformationSerializationSchema} and {@see TypeInformationKeyValueSerializationSchema}.</p>
+ * 
+ * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
+ * is constructed. That means that the client that submits the program needs to be able to
+ * reach the Kafka brokers or ZooKeeper.</p>
+ */
+public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
+
+	private static final long serialVersionUID = -6272159445203409112L;
+
+	/** Configuration key for the number of retries for getting the partition info */
+	public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
+
+	/** Default number of retries for getting the partition info. One retry means going through the full list of brokers */
+	public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
+
+	// ------------------------------------------------------------------------
+
+	/** The properties to parametrize the Kafka consumer and ZooKeeper client */ 
+	private final Properties kafkaProperties;
+
+	/** The behavior when encountering an invalid offset (see {@link OffsetRequest}) */
+	private final long invalidOffsetBehavior;
+
+	/** The interval in which to automatically commit (-1 if deactivated) */
+	private final long autoCommitInterval;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x
+	 *
+	 * @param topic
+	 *           The name of the topic that should be consumed.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
+		this(Collections.singletonList(topic), valueDeserializer, props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x
+	 *
+	 * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+	 * pairs, offsets, and topic names from Kafka.
+	 *
+	 * @param topic
+	 *           The name of the topic that should be consumed.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		this(Collections.singletonList(topic), deserializer, props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x
+	 *
+	 * This constructor allows passing multiple topics to the consumer.
+	 *
+	 * @param topics
+	 *           The Kafka topics to read from.
+	 * @param deserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties that are used to configure both the fetcher and the offset handler.
+	 */
+	public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
+		this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x
+	 *
+	 * This constructor allows passing multiple topics and a key/value deserialization schema.
+	 * 
+	 * @param topics
+	 *           The Kafka topics to read from.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties that are used to configure both the fetcher and the offset handler.
+	 */
+	public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		super(topics, deserializer);
+
+		checkNotNull(topics, "topics");
+		this.kafkaProperties = checkNotNull(props, "props");
+
+		// validate the zookeeper properties
+		validateZooKeeperConfig(props);
+
+		this.invalidOffsetBehavior = getInvalidOffsetBehavior(props);
+		this.autoCommitInterval = PropertiesUtil.getLong(props, "auto.commit.interval.ms", 60000);
+	}
+
+	@Override
+	protected AbstractFetcher<T, ?> createFetcher(
+			SourceContext<T> sourceContext,
+			List<KafkaTopicPartition> thisSubtaskPartitions,
+			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+			StreamingRuntimeContext runtimeContext) throws Exception {
+
+		boolean useMetrics = !Boolean.valueOf(kafkaProperties.getProperty(KEY_DISABLE_METRICS, "false"));
+
+		return new Kafka08Fetcher<>(sourceContext, thisSubtaskPartitions,
+				watermarksPeriodic, watermarksPunctuated,
+				runtimeContext, deserializer, kafkaProperties,
+				invalidOffsetBehavior, autoCommitInterval, useMetrics);
+	}
+
+	@Override
+	protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
+		// Connect to a broker to get the partitions for all topics
+		List<KafkaTopicPartition> partitionInfos =
+			KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(topics, kafkaProperties));
+
+		if (partitionInfos.size() == 0) {
+			throw new RuntimeException(
+				"Unable to retrieve any partitions for the requested topics " + topics +
+					". Please check previous log entries");
+		}
+
+		if (LOG.isInfoEnabled()) {
+			logPartitionInfo(LOG, partitionInfos);
+		}
+
+		return partitionInfos;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Kafka / ZooKeeper communication utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Send request to Kafka to get partitions for topic.
+	 * 
+	 * @param topics The name of the topics.
+	 * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic. 
+	 */
+	public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) {
+		String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+		final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
+		
+		checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+		String[] seedBrokers = seedBrokersConfString.split(",");
+		List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
+
+		final String clientId = "flink-kafka-consumer-partition-lookup";
+		final int soTimeout = getInt(properties, "socket.timeout.ms", 30000);
+		final int bufferSize = getInt(properties, "socket.receive.buffer.bytes", 65536);
+
+		Random rnd = new Random();
+		retryLoop: for (int retry = 0; retry < numRetries; retry++) {
+			// we pick a seed broker randomly to avoid overloading the first broker with all the requests when the
+			// parallel source instances start. Still, we try all available brokers.
+			int index = rnd.nextInt(seedBrokers.length);
+			brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) {
+				String seedBroker = seedBrokers[index];
+				LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries);
+				if (++index == seedBrokers.length) {
+					index = 0;
+				}
+
+				URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker);
+				SimpleConsumer consumer = null;
+				try {
+					consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId);
+
+					TopicMetadataRequest req = new TopicMetadataRequest(topics);
+					kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+
+					List<TopicMetadata> metaData = resp.topicsMetadata();
+
+					// clear in case we have an incomplete list from previous tries
+					partitions.clear();
+					for (TopicMetadata item : metaData) {
+						if (item.errorCode() != ErrorMapping.NoError()) {
+							// warn and try more brokers
+							LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions " +
+									"for " + topics.toString() + ". Error: " + ErrorMapping.exceptionFor(item.errorCode()).getMessage());
+							continue brokersLoop;
+						}
+						if (!topics.contains(item.topic())) {
+							LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
+							continue brokersLoop;
+						}
+						for (PartitionMetadata part : item.partitionsMetadata()) {
+							Node leader = brokerToNode(part.leader());
+							KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId());
+							KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
+							partitions.add(pInfo);
+						}
+					}
+					break retryLoop; // leave the loop through the brokers
+				} catch (Exception e) {
+					//validates seed brokers in case of a ClosedChannelException
+					validateSeedBrokers(seedBrokers, e);
+					LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + "." +
+							"" + e.getClass() + ". Message: " + e.getMessage());
+					LOG.debug("Detailed trace", e);
+					// we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
+					try {
+						Thread.sleep(500);
+					} catch (InterruptedException e1) {
+						// sleep shorter.
+					}
+				} finally {
+					if (consumer != null) {
+						consumer.close();
+					}
+				}
+			} // brokers loop
+		} // retries loop
+		return partitions;
+	}
+
+	/**
+	 * Turn a broker instance into a node instance
+	 * @param broker broker instance
+	 * @return Node representing the given broker
+	 */
+	private static Node brokerToNode(Broker broker) {
+		return new Node(broker.id(), broker.host(), broker.port());
+	}
+
+	/**
+	 * Validate the ZK configuration, checking for required parameters
+	 * @param props Properties to check
+	 */
+	protected static void validateZooKeeperConfig(Properties props) {
+		if (props.getProperty("zookeeper.connect") == null) {
+			throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
+		}
+		if (props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+			throw new IllegalArgumentException("Required property '" + ConsumerConfig.GROUP_ID_CONFIG
+					+ "' has not been set in the properties");
+		}
+		
+		try {
+			//noinspection ResultOfMethodCallIgnored
+			Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
+		}
+		catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
+		}
+		
+		try {
+			//noinspection ResultOfMethodCallIgnored
+			Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
+		}
+		catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer");
+		}
+	}
+
+	/**
+	 * Validate that at least one seed broker is valid in case of a
+	 * ClosedChannelException.
+	 * 
+	 * @param seedBrokers
+	 *            array containing the seed brokers e.g. ["host1:port1",
+	 *            "host2:port2"]
+	 * @param exception
+	 *            instance
+	 */
+	private static void validateSeedBrokers(String[] seedBrokers, Exception exception) {
+		if (!(exception instanceof ClosedChannelException)) {
+			return;
+		}
+		int unknownHosts = 0;
+		for (String broker : seedBrokers) {
+			URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim());
+			try {
+				InetAddress.getByName(brokerUrl.getHost());
+			} catch (UnknownHostException e) {
+				unknownHosts++;
+			}
+		}
+		// throw meaningful exception if all the provided hosts are invalid
+		if (unknownHosts == seedBrokers.length) {
+			throw new IllegalArgumentException("All the servers provided in: '"
+					+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)");
+		}
+	}
+
+	private static long getInvalidOffsetBehavior(Properties config) {
+		final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
+		if (val.equals("none")) {
+			throw new IllegalArgumentException("Cannot use '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
+					+ "' value 'none'. Possible values: 'latest', 'largest', or 'earliest'.");
+		}
+		else if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9
+			return OffsetRequest.LatestTime();
+		} else {
+			return OffsetRequest.EarliestTime();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
new file mode 100644
index 0000000..56ccd0b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * THIS CLASS IS DEPRECATED. Use FlinkKafkaConsumer08 instead.
+ */
+@Deprecated
+public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer08<T> {
+
+	private static final long serialVersionUID = -5649906773771949146L;
+
+	/**
+	 * THIS CONSTRUCTOR IS DEPRECATED. Use FlinkKafkaConsumer08 instead.
+	 */
+	@Deprecated
+	public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
+		super(topic, valueDeserializer, props);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
new file mode 100644
index 0000000..0520336
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * THIS CLASS IS DEPRECATED. Use FlinkKafkaConsumer08 instead.
+ */
+@Deprecated
+public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer08<T> {
+
+	private static final long serialVersionUID = -5649906773771949146L;
+
+	/**
+	 * THIS CONSTRUCTOR IS DEPRECATED. Use FlinkKafkaConsumer08 instead.
+	 */
+	@Deprecated
+	public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
+		super(topic, valueDeserializer, props);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
new file mode 100644
index 0000000..1c2e0b7
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import java.util.Properties;
+
+
+/**
+ * THIS CLASS IS DEPRECATED. Use FlinkKafkaProducer08 instead.
+ */
+@Deprecated
+public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
+
+	@Deprecated
+	public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
+		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), null);
+	}
+
+	@Deprecated
+	public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
+		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null);
+	}
+
+	@Deprecated
+	public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
+		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+
+	}
+
+	@Deprecated
+	public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
+		super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
+	}
+
+	@Deprecated
+	public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
+		super(topicId, serializationSchema, producerConfig, null);
+	}
+
+	@Deprecated
+	public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
+		super(topicId, serializationSchema, producerConfig, customPartitioner);
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
new file mode 100644
index 0000000..65de5fc
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8.
+ *
+ * Please note that this producer does not have any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
+
+	private static final long serialVersionUID = 1L;
+
+	// ------------------- Keyless serialization schema constructors ----------------------
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 *			Comma separated addresses of the brokers
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined (keyless) serialization schema.
+	 */
+	public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined (keyless) serialization schema.
+	 * @param producerConfig
+	 * 			Properties with the producer configuration.
+	 */
+	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());
+	}
+
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 */
+	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+
+	}
+
+	// ------------------- Key/Value serialization schema constructors ----------------------
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 *			Comma separated addresses of the brokers
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema supporting key/value messages
+	 */
+	public FlinkKafkaProducer08(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema supporting key/value messages
+	 * @param producerConfig
+	 * 			Properties with the producer configuration.
+	 */
+	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
+		this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>());
+	}
+
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 */
+	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		super(topicId, serializationSchema, producerConfig, customPartitioner);
+	}
+
+	@Override
+	protected void flush() {
+		// The Kafka 0.8 producer doesn't support flushing, we wait here
+		// until all pending records are confirmed
+		synchronized (pendingRecordsLock) {
+			while (pendingRecords > 0) {
+				try {
+					pendingRecordsLock.wait();
+				} catch (InterruptedException e) {
+					// this can be interrupted when the Task has been cancelled.
+					// by throwing an exception, we ensure that this checkpoint doesn't get confirmed
+					throw new RuntimeException("Flushing got interrupted while checkpointing", e);
+				}
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
new file mode 100644
index 0000000..b155576
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
+ */
+public class Kafka08JsonTableSink extends KafkaJsonTableSink {
+
+	/**
+	 * Creates {@link KafkaTableSink} for Kafka 0.8
+	 *
+	 * @param topic topic in Kafka to which table is written
+	 * @param properties properties to connect to Kafka
+	 * @param partitioner Kafka partitioner
+	 */
+	public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+		super(topic, properties, partitioner);
+	}
+
+	@Override
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
+		return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner);
+	}
+
+	@Override
+	protected Kafka08JsonTableSink createCopy() {
+		return new Kafka08JsonTableSink(topic, properties, partitioner);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
new file mode 100644
index 0000000..63bb57e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.8.
+ */
+public class Kafka08JsonTableSource extends KafkaJsonTableSource {
+
+	/**
+	 * Creates a Kafka 0.8 JSON {@link StreamTableSource}.
+	 *
+	 * @param topic      Kafka topic to consume.
+	 * @param properties Properties for the Kafka consumer.
+	 * @param fieldNames Row field names.
+	 * @param fieldTypes Row field types.
+	 */
+	public Kafka08JsonTableSource(
+			String topic,
+			Properties properties,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes) {
+
+		super(topic, properties, fieldNames, fieldTypes);
+	}
+
+	/**
+	 * Creates a Kafka 0.8 JSON {@link StreamTableSource}.
+	 *
+	 * @param topic      Kafka topic to consume.
+	 * @param properties Properties for the Kafka consumer.
+	 * @param fieldNames Row field names.
+	 * @param fieldTypes Row field types.
+	 */
+	public Kafka08JsonTableSource(
+			String topic,
+			Properties properties,
+			String[] fieldNames,
+			Class<?>[] fieldTypes) {
+
+		super(topic, properties, fieldNames, fieldTypes);
+	}
+
+	@Override
+	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+		return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
new file mode 100644
index 0000000..8f51237
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.8.
+ */
+public class Kafka08TableSource extends KafkaTableSource {
+
+	/**
+	 * Creates a Kafka 0.8 {@link StreamTableSource}.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @param fieldNames            Row field names.
+	 * @param fieldTypes            Row field types.
+	 */
+	public Kafka08TableSource(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes) {
+
+		super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
+	}
+
+	/**
+	 * Creates a Kafka 0.8 {@link StreamTableSource}.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @param fieldNames            Row field names.
+	 * @param fieldTypes            Row field types.
+	 */
+	public Kafka08TableSource(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			String[] fieldNames,
+			Class<?>[] fieldTypes) {
+
+		super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
+	}
+
+	@Override
+	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+		return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
new file mode 100644
index 0000000..23ff276
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A special form of blocking queue with two additions:
+ * <ol>
+ *     <li>The queue can be closed atomically when empty. Adding elements after the queue
+ *         is closed fails. This allows queue consumers to atomically discover that no elements
+ *         are available and mark themselves as shut down.</li>
+ *     <li>The queue allows to poll batches of elements in one polling call.</li>
+ * </ol>
+ * 
+ * The queue has no capacity restriction and is safe for multiple producers and consumers.
+ * 
+ * <p>Note: Null elements are prohibited.
+ * 
+ * @param <E> The type of elements in the queue.
+ */
+public class ClosableBlockingQueue<E> {
+
+	/** The lock used to make queue accesses and open checks atomic */
+	private final ReentrantLock lock;
+	
+	/** The condition on which blocking get-calls wait if the queue is empty */
+	private final Condition nonEmpty;
+	
+	/** The deque of elements */
+	private final ArrayDeque<E> elements;
+	
+	/** Flag marking the status of the queue */
+	private volatile boolean open;
+	
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new empty queue.
+	 */
+	public ClosableBlockingQueue() {
+		this(10);
+	}
+
+	/**
+	 * Creates a new empty queue, reserving space for at least the specified number
+	 * of elements. The queu can still grow, of more elements are added than the
+	 * reserved space.
+	 * 
+	 * @param initialSize The number of elements to reserve space for.
+	 */
+	public ClosableBlockingQueue(int initialSize) {
+		this.lock = new ReentrantLock(true);
+		this.nonEmpty = this.lock.newCondition();
+		
+		this.elements = new ArrayDeque<>(initialSize);
+		this.open = true;
+		
+		
+	}
+
+	/**
+	 * Creates a new queue that contains the given elements.
+	 * 
+	 * @param initialElements The elements to initially add to the queue.
+	 */
+	public ClosableBlockingQueue(Collection<? extends E> initialElements) {
+		this(initialElements.size());
+		this.elements.addAll(initialElements);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Size and status
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the number of elements currently in the queue.
+	 * @return The number of elements currently in the queue.
+	 */
+	public int size() {
+		lock.lock();
+		try {
+			return elements.size();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Checks whether the queue is empty (has no elements).
+	 * @return True, if the queue is empty; false, if it is non-empty.
+	 */
+	public boolean isEmpty() {
+		return size() == 0;
+	}
+
+	/**
+	 * Checks whether the queue is currently open, meaning elements can be added and polled.
+	 * @return True, if the queue is open; false, if it is closed.
+	 */
+	public boolean isOpen() {
+		return open;
+	}
+	
+	/**
+	 * Tries to close the queue. Closing the queue only succeeds when no elements are
+	 * in the queue when this method is called. Checking whether the queue is empty, and
+	 * marking the queue as closed is one atomic operation.
+	 *
+	 * @return True, if the queue is closed, false if the queue remains open.
+	 */
+	public boolean close() {
+		lock.lock();
+		try {
+			if (open) {
+				if (elements.isEmpty()) {
+					open = false;
+					nonEmpty.signalAll();
+					return true;
+				} else {
+					return false;
+				}
+			}
+			else {
+				// already closed
+				return true;
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Adding / Removing elements
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Tries to add an element to the queue, if the queue is still open. Checking whether the queue
+	 * is open and adding the element is one atomic operation.
+	 * 
+	 * <p>Unlike the {@link #add(Object)} method, this method never throws an exception,
+	 * but only indicates via the return code if the element was added or the
+	 * queue was closed.
+	 * 
+	 * @param element The element to add.
+	 * @return True, if the element was added, false if the queue was closes.
+	 */
+	public boolean addIfOpen(E element) {
+		requireNonNull(element);
+		
+		lock.lock();
+		try {
+			if (open) {
+				elements.addLast(element);
+				if (elements.size() == 1) {
+					nonEmpty.signalAll();
+				}
+			}
+			return open;
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Adds the element to the queue, or fails with an exception, if the queue is closed.
+	 * Checking whether the queue is open and adding the element is one atomic operation.
+	 * 
+	 * @param element The element to add.
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 */
+	public void add(E element) throws IllegalStateException {
+		requireNonNull(element);
+
+		lock.lock();
+		try {
+			if (open) {
+				elements.addLast(element);
+				if (elements.size() == 1) {
+					nonEmpty.signalAll();
+				}
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns the queue's next element without removing it, if the queue is non-empty.
+	 * Otherwise, returns null. 
+	 *
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and getting the next element is one atomic operation.
+	 * 
+	 * <p>This method never blocks.
+	 * 
+	 * @return The queue's next element, or null, if the queue is empty.
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 */
+	public E peek() {
+		lock.lock();
+		try {
+			if (open) {
+				if (elements.size() > 0) {
+					return elements.getFirst();
+				} else {
+					return null;
+				}
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns the queue's next element and removes it, the queue is non-empty.
+	 * Otherwise, this method returns null. 
+	 *
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 *
+	 * <p>This method never blocks.
+	 *
+	 * @return The queue's next element, or null, if the queue is empty.
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 */
+	public E poll() {
+		lock.lock();
+		try {
+			if (open) {
+				if (elements.size() > 0) {
+					return elements.removeFirst();
+				} else {
+					return null;
+				}
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns all of the queue's current elements in a list, if the queue is non-empty.
+	 * Otherwise, this method returns null. 
+	 *
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the elements is one atomic operation.
+	 *
+	 * <p>This method never blocks.
+	 *
+	 * @return All of the queue's elements, or null, if the queue is empty.
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 */
+	public List<E> pollBatch() {
+		lock.lock();
+		try {
+			if (open) {
+				if (elements.size() > 0) {
+					ArrayList<E> result = new ArrayList<>(elements);
+					elements.clear();
+					return result;
+				} else {
+					return null;
+				}
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns the next element in the queue. If the queue is empty, this method
+	 * waits until at least one element is added.
+	 * 
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 * 
+	 * @return The next element in the queue, never null.
+	 * 
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+	 *                              element to be added.
+	 */
+	public E getElementBlocking() throws InterruptedException {
+		lock.lock();
+		try {
+			while (open && elements.isEmpty()) {
+				nonEmpty.await();
+			}
+			
+			if (open) {
+				return elements.removeFirst();
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns the next element in the queue. If the queue is empty, this method
+	 * waits at most a certain time until an element becomes available. If no element
+	 * is available after that time, the method returns null.
+	 * 
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 * 
+	 * @param timeoutMillis The number of milliseconds to block, at most.
+	 * @return The next element in the queue, or null, if the timeout expires  before an element is available.
+	 * 
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+	 *                              element to be added.
+	 */
+	public E getElementBlocking(long timeoutMillis) throws InterruptedException {
+		if (timeoutMillis == 0L) {
+			// wait forever case
+			return getElementBlocking();
+		} else if (timeoutMillis < 0L) {
+			throw new IllegalArgumentException("invalid timeout");
+		}
+		
+		final long deadline = System.currentTimeMillis() + timeoutMillis;
+		
+		lock.lock();
+		try {
+			while (open && elements.isEmpty() && timeoutMillis > 0) { 
+				nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
+				timeoutMillis = deadline - System.currentTimeMillis();
+			}
+			
+			if (!open) {
+				throw new IllegalStateException("queue is closed");
+			}
+			else if (elements.isEmpty()) {
+				return null;
+			} else {
+				return elements.removeFirst();
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Gets all the elements found in the list, or blocks until at least one element
+	 * was added. If the queue is empty when this method is called, it blocks until
+	 * at least one element is added.
+	 *
+	 * <p>This method always returns a list with at least one element.
+	 * 
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 * 
+	 * @return A list with all elements in the queue, always at least one element.
+	 * 
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+	 *                              element to be added.
+	 */
+	public List<E> getBatchBlocking() throws InterruptedException {
+		lock.lock();
+		try {
+			while (open && elements.isEmpty()) {
+				nonEmpty.await();
+			}
+			if (open) {
+				ArrayList<E> result = new ArrayList<>(elements);
+				elements.clear();
+				return result;
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Gets all the elements found in the list, or blocks until at least one element
+	 * was added. This method is similar as {@link #getBatchBlocking()}, but takes
+	 * a number of milliseconds that the method will maximally wait before returning.
+	 * 
+	 * <p>This method never returns null, but an empty list, if the queue is empty when
+	 * the method is called and the request times out before an element was added.
+	 * 
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 * 
+	 * @param timeoutMillis The number of milliseconds to wait, at most.
+	 * @return A list with all elements in the queue, possible an empty list.
+	 *
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+	 *                              element to be added.
+	 */
+	public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException {
+		if (timeoutMillis == 0L) {
+			// wait forever case
+			return getBatchBlocking();
+		} else if (timeoutMillis < 0L) {
+			throw new IllegalArgumentException("invalid timeout");
+		}
+
+		final long deadline = System.currentTimeMillis() + timeoutMillis;
+
+		lock.lock();
+		try {
+			while (open && elements.isEmpty() && timeoutMillis > 0) {
+				nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
+				timeoutMillis = deadline - System.currentTimeMillis();
+			}
+
+			if (!open) {
+				throw new IllegalStateException("queue is closed");
+			}
+			else if (elements.isEmpty()) {
+				return Collections.emptyList();
+			}
+			else {
+				ArrayList<E> result = new ArrayList<>(elements);
+				elements.clear();
+				return result;
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Standard Utilities
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		int hashCode = 17;
+		for (E element : elements) {
+			hashCode = 31 * hashCode + element.hashCode();
+		}
+		return hashCode;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		} else if (obj != null && obj.getClass() == ClosableBlockingQueue.class) {
+			@SuppressWarnings("unchecked")
+			ClosableBlockingQueue<E> that = (ClosableBlockingQueue<E>) obj;
+			
+			if (this.elements.size() == that.elements.size()) {
+				Iterator<E> thisElements = this.elements.iterator();
+				for (E thatNext : that.elements) {
+					E thisNext = thisElements.next();
+					if (!(thisNext == null ? thatNext == null : thisNext.equals(thatNext))) {
+						return false;
+					}
+				}
+				return true;
+			} else {
+				return false;
+			}
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return elements.toString();
+	}
+}