You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:28 UTC

[38/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-flume/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-flume/pom.xml b/flink-connectors/flink-connector-flume/pom.xml
new file mode 100644
index 0000000..64860de
--- /dev/null
+++ b/flink-connectors/flink-connector-flume/pom.xml
@@ -0,0 +1,175 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.2-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-flume_2.10</artifactId>
+	<name>flink-connector-flume</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<flume-ng.version>1.5.0</flume-ng.version>
+	</properties>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flume</groupId>
+			<artifactId>flume-ng-core</artifactId>
+			<version>${flume-ng.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-io</groupId>
+					<artifactId>commons-io</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-codec</groupId>
+					<artifactId>commons-codec</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-cli</groupId>
+					<artifactId>commons-cli</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-lang</groupId>
+					<artifactId>commons-lang</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.avro</groupId>
+					<artifactId>avro</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jackson</groupId>
+					<artifactId>jackson-core-asl</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jackson</groupId>
+					<artifactId>jackson-mapper-asl</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.thoughtworks.paranamer</groupId>
+					<artifactId>paranamer</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.xerial.snappy</groupId>
+					<artifactId>snappy-java</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.tukaani</groupId>
+					<artifactId>xz</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.velocity</groupId>
+					<artifactId>velocity</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-collections</groupId>
+					<artifactId>commons-collections</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.google.code.gson</groupId>
+					<artifactId>gson</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.thrift</groupId>
+					<artifactId>libthrift</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<configuration>
+							<artifactSet>
+								<includes combine.children="append">
+									<!-- We include all dependencies that transitively depend on guava -->
+									<include>org.apache.flume:*</include>
+								</includes>
+							</artifactSet>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
new file mode 100644
index 0000000..2dc043b
--- /dev/null
+++ b/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlumeSink<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
+
+	private transient FlinkRpcClientFacade client;
+	boolean initDone = false;
+	String host;
+	int port;
+	SerializationSchema<IN> schema;
+
+	public FlumeSink(String host, int port, SerializationSchema<IN> schema) {
+		this.host = host;
+		this.port = port;
+		this.schema = schema;
+	}
+
+	/**
+	 * Receives tuples from the Apache Flink {@link DataStream} and forwards
+	 * them to Apache Flume.
+	 * 
+	 * @param value
+	 *            The tuple arriving from the datastream
+	 */
+	@Override
+	public void invoke(IN value) {
+
+		byte[] data = schema.serialize(value);
+		client.sendDataToFlume(data);
+
+	}
+
+	private class FlinkRpcClientFacade {
+		private RpcClient client;
+		private String hostname;
+		private int port;
+
+		/**
+		 * Initializes the connection to Apache Flume.
+		 * 
+		 * @param hostname
+		 *            The host
+		 * @param port
+		 *            The port.
+		 */
+		public void init(String hostname, int port) {
+			// Setup the RPC connection
+			this.hostname = hostname;
+			this.port = port;
+			int initCounter = 0;
+			while (true) {
+				if (initCounter >= 90) {
+					throw new RuntimeException("Cannot establish connection with" + port + " at "
+							+ host);
+				}
+				try {
+					this.client = RpcClientFactory.getDefaultInstance(hostname, port);
+				} catch (FlumeException e) {
+					// Wait one second if the connection failed before the next
+					// try
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e1) {
+						if (LOG.isErrorEnabled()) {
+							LOG.error("Interrupted while trying to connect {} at {}", port, host);
+						}
+					}
+				}
+				if (client != null) {
+					break;
+				}
+				initCounter++;
+			}
+			initDone = true;
+		}
+
+		/**
+		 * Sends byte arrays as {@link Event} series to Apache Flume.
+		 * 
+		 * @param data
+		 *            The byte array to send to Apache FLume
+		 */
+		public void sendDataToFlume(byte[] data) {
+			Event event = EventBuilder.withBody(data);
+
+			try {
+				client.append(event);
+
+			} catch (EventDeliveryException e) {
+				// clean up and recreate the client
+				client.close();
+				client = null;
+				client = RpcClientFactory.getDefaultInstance(hostname, port);
+			}
+		}
+
+	}
+
+	@Override
+	public void close() {
+		client.client.close();
+	}
+
+	@Override
+	public void open(Configuration config) {
+		client = new FlinkRpcClientFacade();
+		client.init(host, port);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
new file mode 100644
index 0000000..26352bb
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -0,0 +1,205 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.2-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-kafka-0.10_2.10</artifactId>
+	<name>flink-connector-kafka-0.10</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<kafka.version>0.10.0.1</kafka.version>
+	</properties>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.apache.kafka</groupId>
+					<artifactId>kafka_${scala.binary.version}</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<!-- Add Kafka 0.10.x as a dependency -->
+
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+			<version>${kafka.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project,
+			won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<!-- exclude Kafka dependencies -->
+				<exclusion>
+					<groupId>org.apache.kafka</groupId>
+					<artifactId>kafka_${scala.binary.version}</artifactId>
+				</exclusion>
+			</exclusions>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka-base_2.10</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<!-- exclude Kafka dependencies -->
+				<exclusion>
+					<groupId>org.apache.kafka</groupId>
+					<artifactId>kafka_${scala.binary.version}</artifactId>
+				</exclusion>
+			</exclusions>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<!-- include 0.10 server for tests  -->
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_${scala.binary.version}</artifactId>
+			<version>${kafka.version}</version>
+			<scope>test</scope>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-jmx</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+						<configuration>
+							<includes>
+								<include>**/KafkaTestEnvironmentImpl*</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-source-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>attach-test-sources</id>
+						<goals>
+							<goal>test-jar-no-fork</goal>
+						</goals>
+						<configuration>
+							<includes>
+								<include>**/KafkaTestEnvironmentImpl*</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
+					<forkCount>1</forkCount>
+					<argLine>-Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+	
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
new file mode 100644
index 0000000..a9ce336
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
+ * Apache Kafka 0.10.x. The consumer can run in multiple parallel instances, each of which will pull
+ * data from one or more Kafka partitions. 
+ * 
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
+ * during a failure, and that the computation processes elements "exactly once". 
+ * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
+ *
+ * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer
+ * has consumed a topic.</p>
+ *
+ * <p>Please refer to Kafka's documentation for the available configuration properties:
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs</p>
+ *
+ * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
+ * is constructed. That means that the client that submits the program needs to be able to
+ * reach the Kafka brokers or ZooKeeper.</p>
+ */
+public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
+
+	private static final long serialVersionUID = 2324564345203409112L;
+
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.10.x
+	 *
+	 * @param topic
+	 *           The name of the topic that should be consumed.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
+		this(Collections.singletonList(topic), valueDeserializer, props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.10.x
+	 *
+	 * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+	 * pairs, offsets, and topic names from Kafka.
+	 *
+	 * @param topic
+	 *           The name of the topic that should be consumed.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	public FlinkKafkaConsumer010(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		this(Collections.singletonList(topic), deserializer, props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.10.x
+	 *
+	 * This constructor allows passing multiple topics to the consumer.
+	 *
+	 * @param topics
+	 *           The Kafka topics to read from.
+	 * @param deserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties that are used to configure both the fetcher and the offset handler.
+	 */
+	public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
+		this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.10.x
+	 *
+	 * This constructor allows passing multiple topics and a key/value deserialization schema.
+	 *
+	 * @param topics
+	 *           The Kafka topics to read from.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties that are used to configure both the fetcher and the offset handler.
+	 */
+	public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		super(topics, deserializer, props);
+	}
+
+	@Override
+	protected AbstractFetcher<T, ?> createFetcher(
+			SourceContext<T> sourceContext,
+			List<KafkaTopicPartition> thisSubtaskPartitions,
+			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+			StreamingRuntimeContext runtimeContext) throws Exception {
+
+		boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
+
+		return new Kafka010Fetcher<>(
+				sourceContext,
+				thisSubtaskPartitions,
+				watermarksPeriodic,
+				watermarksPunctuated,
+				runtimeContext.getProcessingTimeService(),
+				runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+				runtimeContext.getUserCodeClassLoader(),
+				runtimeContext.isCheckpointingEnabled(),
+				runtimeContext.getTaskNameWithSubtasks(),
+				runtimeContext.getMetricGroup(),
+				deserializer,
+				properties,
+				pollTimeout,
+				useMetrics);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
new file mode 100644
index 0000000..cc0194b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x
+ *
+ * Implementation note: This producer is a hybrid between a regular regular sink function (a)
+ * and a custom operator (b).
+ *
+ * For (a), the class implements the SinkFunction and RichFunction interfaces.
+ * For (b), it extends the StreamTask class.
+ *
+ * Details about approach (a):
+ *
+ *  Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the
+ *  DataStream.addSink() method.
+ *  Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record
+ *  the Kafka 0.10 producer has a second invocation option, approach (b).
+ *
+ * Details about approach (b):
+ *  Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the
+ *  FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer
+ *  can access the internal record timestamp of the record and write it to Kafka.
+ *
+ * All methods and constructors in this class are marked with the approach they are needed for.
+ */
+public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction {
+
+	/**
+	 * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
+	 */
+	private boolean writeTimestampToKafka = false;
+
+	// ---------------------- "Constructors" for timestamp writing ------------------
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 * @param inStream The stream to write to Kafka
+	 * @param topicId ID of the Kafka topic.
+	 * @param serializationSchema User defined serialization schema supporting key/value messages
+	 * @param producerConfig Properties with the producer configuration.
+	 */
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					KeyedSerializationSchema<T> serializationSchema,
+																					Properties producerConfig) {
+		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FixedPartitioner<T>());
+	}
+
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 * @param inStream The stream to write to Kafka
+	 * @param topicId ID of the Kafka topic.
+	 * @param serializationSchema User defined (keyless) serialization schema.
+	 * @param producerConfig Properties with the producer configuration.
+	 */
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					SerializationSchema<T> serializationSchema,
+																					Properties producerConfig) {
+		return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 *  @param inStream The stream to write to Kafka
+	 *  @param topicId The name of the target topic
+	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 */
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					KeyedSerializationSchema<T> serializationSchema,
+																					Properties producerConfig,
+																					KafkaPartitioner<T> customPartitioner) {
+
+		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+		FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
+		SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+		return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+	}
+
+	// ---------------------- Regular constructors w/o timestamp support  ------------------
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 *			Comma separated addresses of the brokers
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined (keyless) serialization schema.
+	 */
+	public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined (keyless) serialization schema.
+	 * @param producerConfig
+	 * 			Properties with the producer configuration.
+	 */
+	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 */
+	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+	}
+
+	// ------------------- Key/Value serialization schema constructors ----------------------
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 *			Comma separated addresses of the brokers
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema supporting key/value messages
+	 */
+	public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) {
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema supporting key/value messages
+	 * @param producerConfig
+	 * 			Properties with the producer configuration.
+	 */
+	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
+		this(topicId, serializationSchema, producerConfig, new FixedPartitioner<T>());
+	}
+
+	/**
+	 * Create Kafka producer
+	 *
+	 * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+	 */
+	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
+		// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
+		// invoke call.
+		super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
+	}
+
+
+	// ----------------------------- Generic element processing  ---------------------------
+
+	private void invokeInternal(T next, long elementTimestamp) throws Exception {
+
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+
+		internalProducer.checkErroneous();
+
+		byte[] serializedKey = internalProducer.schema.serializeKey(next);
+		byte[] serializedValue = internalProducer.schema.serializeValue(next);
+		String targetTopic = internalProducer.schema.getTargetTopic(next);
+		if (targetTopic == null) {
+			targetTopic = internalProducer.defaultTopicId;
+		}
+
+		Long timestamp = null;
+		if(this.writeTimestampToKafka) {
+			timestamp = elementTimestamp;
+		}
+
+		ProducerRecord<byte[], byte[]> record;
+		if (internalProducer.partitioner == null) {
+			record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
+		} else {
+			record = new ProducerRecord<>(targetTopic, internalProducer.partitioner.partition(next, serializedKey, serializedValue, internalProducer.partitions.length), timestamp, serializedKey, serializedValue);
+		}
+		if (internalProducer.flushOnCheckpoint) {
+			synchronized (internalProducer.pendingRecordsLock) {
+				internalProducer.pendingRecords++;
+			}
+		}
+		internalProducer.producer.send(record, internalProducer.callback);
+	}
+
+
+	// ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ----
+
+
+	// ---- Configuration setters
+
+	/**
+	 * Defines whether the producer should fail on errors, or only log them.
+	 * If this is set to true, then exceptions will be only logged, if set to false,
+	 * exceptions will be eventually thrown and cause the streaming program to
+	 * fail (and enter recovery).
+	 *
+	 * Method is only accessible for approach (a) (see above)
+	 *
+	 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+	 */
+	public void setLogFailuresOnly(boolean logFailuresOnly) {
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+		internalProducer.setLogFailuresOnly(logFailuresOnly);
+	}
+
+	/**
+	 * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
+	 * to be acknowledged by the Kafka producer on a checkpoint.
+	 * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
+	 *
+	 * Method is only accessible for approach (a) (see above)
+	 *
+	 * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
+	 */
+	public void setFlushOnCheckpoint(boolean flush) {
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+		internalProducer.setFlushOnCheckpoint(flush);
+	}
+
+	/**
+	 * This method is used for approach (a) (see above)
+	 *
+	 */
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+		internalProducer.open(parameters);
+	}
+
+	/**
+	 * This method is used for approach (a) (see above)
+	 */
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+		return internalProducer.getIterationRuntimeContext();
+	}
+
+	/**
+	 * This method is used for approach (a) (see above)
+	 */
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+		internalProducer.setRuntimeContext(t);
+	}
+
+	/**
+	 * Invoke method for using the Sink as DataStream.addSink() sink.
+	 *
+	 * This method is used for approach (a) (see above)
+	 *
+	 * @param value The input record.
+	 */
+	@Override
+	public void invoke(T value) throws Exception {
+		invokeInternal(value, Long.MAX_VALUE);
+	}
+
+
+	// ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ----
+
+
+	/**
+	 * Process method for using the sink with timestamp support.
+	 *
+	 * This method is used for approach (b) (see above)
+	 */
+	@Override
+	public void processElement(StreamRecord<T> element) throws Exception {
+		invokeInternal(element.getValue(), element.getTimestamp());
+	}
+
+	/**
+	 * Configuration object returned by the writeToKafkaWithTimestamps() call.
+	 */
+	public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {
+
+		private final FlinkKafkaProducerBase wrappedProducerBase;
+		private final FlinkKafkaProducer010 producer;
+
+		private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) {
+			//noinspection unchecked
+			super(stream, producer);
+			this.producer = producer;
+			this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction;
+		}
+
+		/**
+		 * Defines whether the producer should fail on errors, or only log them.
+		 * If this is set to true, then exceptions will be only logged, if set to false,
+		 * exceptions will be eventually thrown and cause the streaming program to
+		 * fail (and enter recovery).
+		 *
+		 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+		 */
+		public void setLogFailuresOnly(boolean logFailuresOnly) {
+			this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly);
+		}
+
+		/**
+		 * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
+		 * to be acknowledged by the Kafka producer on a checkpoint.
+		 * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
+		 *
+		 * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
+		 */
+		public void setFlushOnCheckpoint(boolean flush) {
+			this.wrappedProducerBase.setFlushOnCheckpoint(flush);
+		}
+
+		/**
+		 * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
+		 * Timestamps must be positive for Kafka to accept them.
+		 *
+		 * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
+		 */
+		public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+			this.producer.writeTimestampToKafka = writeTimestampToKafka;
+		}
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
new file mode 100644
index 0000000..ddf1ad3
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.10.
+ */
+public class Kafka010JsonTableSource extends Kafka09JsonTableSource {
+
+	/**
+	 * Creates a Kafka 0.10 JSON {@link StreamTableSource}.
+	 *
+	 * @param topic      Kafka topic to consume.
+	 * @param properties Properties for the Kafka consumer.
+	 * @param fieldNames Row field names.
+	 * @param fieldTypes Row field types.
+	 */
+	public Kafka010JsonTableSource(
+			String topic,
+			Properties properties,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes) {
+
+		super(topic, properties, fieldNames, fieldTypes);
+	}
+
+	/**
+	 * Creates a Kafka 0.10 JSON {@link StreamTableSource}.
+	 *
+	 * @param topic      Kafka topic to consume.
+	 * @param properties Properties for the Kafka consumer.
+	 * @param fieldNames Row field names.
+	 * @param fieldTypes Row field types.
+	 */
+	public Kafka010JsonTableSource(
+			String topic,
+			Properties properties,
+			String[] fieldNames,
+			Class<?>[] fieldTypes) {
+
+		super(topic, properties, fieldNames, fieldTypes);
+	}
+
+	@Override
+	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
new file mode 100644
index 0000000..732440b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.10.
+ */
+public class Kafka010TableSource extends Kafka09TableSource {
+
+	/**
+	 * Creates a Kafka 0.10 {@link StreamTableSource}.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @param fieldNames            Row field names.
+	 * @param fieldTypes            Row field types.
+	 */
+	public Kafka010TableSource(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes) {
+
+		super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
+	}
+
+	/**
+	 * Creates a Kafka 0.10 {@link StreamTableSource}.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @param fieldNames            Row field names.
+	 * @param fieldTypes            Row field types.
+	 */
+	public Kafka010TableSource(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			String[] fieldNames,
+			Class<?>[] fieldTypes) {
+
+		super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
+	}
+
+	@Override
+	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
new file mode 100644
index 0000000..71dd29a
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer API.
+ * 
+ * <p>This fetcher re-uses basically all functionality of the 0.9 fetcher. It only additionally
+ * takes the KafkaRecord-attached timestamp and attaches it to the Flink records.
+ * 
+ * @param <T> The type of elements produced by the fetcher.
+ */
+public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
+
+	public Kafka010Fetcher(
+			SourceContext<T> sourceContext,
+			List<KafkaTopicPartition> assignedPartitions,
+			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+			ProcessingTimeService processingTimeProvider,
+			long autoWatermarkInterval,
+			ClassLoader userCodeClassLoader,
+			boolean enableCheckpointing,
+			String taskNameWithSubtasks,
+			MetricGroup metricGroup,
+			KeyedDeserializationSchema<T> deserializer,
+			Properties kafkaProperties,
+			long pollTimeout,
+			boolean useMetrics) throws Exception
+	{
+		super(
+				sourceContext,
+				assignedPartitions,
+				watermarksPeriodic,
+				watermarksPunctuated,
+				processingTimeProvider,
+				autoWatermarkInterval,
+				userCodeClassLoader,
+				enableCheckpointing,
+				taskNameWithSubtasks,
+				metricGroup,
+				deserializer,
+				kafkaProperties,
+				pollTimeout,
+				useMetrics);
+	}
+
+	@Override
+	protected void emitRecord(
+			T record,
+			KafkaTopicPartitionState<TopicPartition> partition,
+			long offset,
+			ConsumerRecord<?, ?> consumerRecord) throws Exception {
+
+		// we attach the Kafka 0.10 timestamp here
+		emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp());
+	}
+
+	/**
+	 * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10,
+	 * changing binary signatures
+	 */
+	@Override
+	protected KafkaConsumerCallBridge010 createCallBridge() {
+		return new KafkaConsumerCallBridge010();
+	}
+
+	@Override
+	protected String getFetcherName() {
+		return "Kafka 0.10 Fetcher";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
new file mode 100644
index 0000000..a81b098
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls the {@link KafkaConsumer#assign(java.util.Collection)} method.
+ * 
+ * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
+ * changing {@code assign(List)} to {@code assign(Collection)}.
+ * 
+ * Because of that, we need two versions whose compiled code goes against different method signatures.
+ */
+public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
+
+	@Override
+	public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
+		consumer.assign(topicPartitions);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6bdfb48
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
new file mode 100644
index 0000000..6ee0429
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.Handover;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka010Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConsumerThread.class)
+public class Kafka010FetcherTest {
+
+    @Test
+    public void testCommitDoesNotBlock() throws Exception {
+
+        // test data
+        final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+        final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+        testCommitData.put(testPartition, 11L);
+
+        // to synchronize when the consumer is in its blocking method
+        final OneShotLatch sync = new OneShotLatch();
+
+        // ----- the mock consumer with blocking poll calls ----
+        final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+        KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+        when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+
+            @Override
+            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+                sync.trigger();
+                blockerLatch.await();
+                return ConsumerRecords.empty();
+            }
+        });
+
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) {
+                blockerLatch.trigger();
+                return null;
+            }
+        }).when(mockConsumer).wakeup();
+
+        // make sure the fetcher creates the mock consumer
+        whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+        // ----- create the test fetcher -----
+
+        @SuppressWarnings("unchecked")
+        SourceContext<String> sourceContext = mock(SourceContext.class);
+        List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+        KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+                sourceContext,
+                topics,
+                null, /* periodic assigner */
+                null, /* punctuated assigner */
+                new TestProcessingTimeService(),
+                10,
+                getClass().getClassLoader(),
+                false, /* checkpointing */
+                "taskname-with-subtask",
+                new UnregisteredMetricsGroup(),
+                schema,
+                new Properties(),
+                0L,
+                false);
+
+        // ----- run the fetcher -----
+
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final Thread fetcherRunner = new Thread("fetcher runner") {
+
+            @Override
+            public void run() {
+                try {
+                    fetcher.runFetchLoop();
+                } catch (Throwable t) {
+                    error.set(t);
+                }
+            }
+        };
+        fetcherRunner.start();
+
+        // wait until the fetcher has reached the method of interest
+        sync.await();
+
+        // ----- trigger the offset commit -----
+
+        final AtomicReference<Throwable> commitError = new AtomicReference<>();
+        final Thread committer = new Thread("committer runner") {
+            @Override
+            public void run() {
+                try {
+                    fetcher.commitInternalOffsetsToKafka(testCommitData);
+                } catch (Throwable t) {
+                    commitError.set(t);
+                }
+            }
+        };
+        committer.start();
+
+        // ----- ensure that the committer finishes in time  -----
+        committer.join(30000);
+        assertFalse("The committer did not finish in time", committer.isAlive());
+
+        // ----- test done, wait till the fetcher is done for a clean shutdown -----
+        fetcher.cancel();
+        fetcherRunner.join();
+
+        // check that there were no errors in the fetcher
+        final Throwable fetcherError = error.get();
+        if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
+            throw new Exception("Exception in the fetcher", fetcherError);
+        }
+        final Throwable committerError = commitError.get();
+        if (committerError != null) {
+            throw new Exception("Exception in the committer", committerError);
+        }
+    }
+
+    @Test
+    public void ensureOffsetsGetCommitted() throws Exception {
+
+        // test data
+        final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
+        final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
+
+        final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
+        testCommitData1.put(testPartition1, 11L);
+        testCommitData1.put(testPartition2, 18L);
+
+        final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
+        testCommitData2.put(testPartition1, 19L);
+        testCommitData2.put(testPartition2, 28L);
+
+        final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
+
+
+        // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
+
+        final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+        KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+
+        when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+            @Override
+            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+                blockerLatch.await();
+                return ConsumerRecords.empty();
+            }
+        });
+
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) {
+                blockerLatch.trigger();
+                return null;
+            }
+        }).when(mockConsumer).wakeup();
+
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) {
+                @SuppressWarnings("unchecked")
+                Map<TopicPartition, OffsetAndMetadata> offsets =
+                        (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
+
+                OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
+
+                commitStore.add(offsets);
+                callback.onComplete(offsets, null);
+
+                return null;
+            }
+        }).when(mockConsumer).commitAsync(
+                Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
+
+        // make sure the fetcher creates the mock consumer
+        whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+        // ----- create the test fetcher -----
+
+        @SuppressWarnings("unchecked")
+        SourceContext<String> sourceContext = mock(SourceContext.class);
+        List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+        KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+        StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+
+        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+                sourceContext,
+                topics,
+                null, /* periodic assigner */
+                null, /* punctuated assigner */
+                new TestProcessingTimeService(),
+                10,
+                getClass().getClassLoader(),
+                false, /* checkpointing */
+                "taskname-with-subtask",
+                new UnregisteredMetricsGroup(),
+                schema,
+                new Properties(),
+                0L,
+                false);
+
+
+        // ----- run the fetcher -----
+
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final Thread fetcherRunner = new Thread("fetcher runner") {
+
+            @Override
+            public void run() {
+                try {
+                    fetcher.runFetchLoop();
+                } catch (Throwable t) {
+                    error.set(t);
+                }
+            }
+        };
+        fetcherRunner.start();
+
+        // ----- trigger the first offset commit -----
+
+        fetcher.commitInternalOffsetsToKafka(testCommitData1);
+        Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
+
+        for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
+            TopicPartition partition = entry.getKey();
+            if (partition.topic().equals("test")) {
+                assertEquals(42, partition.partition());
+                assertEquals(12L, entry.getValue().offset());
+            }
+            else if (partition.topic().equals("another")) {
+                assertEquals(99, partition.partition());
+                assertEquals(18L, entry.getValue().offset());
+            }
+        }
+
+        // ----- trigger the second offset commit -----
+
+        fetcher.commitInternalOffsetsToKafka(testCommitData2);
+        Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
+
+        for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
+            TopicPartition partition = entry.getKey();
+            if (partition.topic().equals("test")) {
+                assertEquals(42, partition.partition());
+                assertEquals(20L, entry.getValue().offset());
+            }
+            else if (partition.topic().equals("another")) {
+                assertEquals(99, partition.partition());
+                assertEquals(28L, entry.getValue().offset());
+            }
+        }
+
+        // ----- test done, wait till the fetcher is done for a clean shutdown -----
+        fetcher.cancel();
+        fetcherRunner.join();
+
+        // check that there were no errors in the fetcher
+        final Throwable caughtError = error.get();
+        if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
+            throw new Exception("Exception in the fetcher", caughtError);
+        }
+    }
+
+    @Test
+    public void testCancellationWhenEmitBlocks() throws Exception {
+
+        // ----- some test data -----
+
+        final String topic = "test-topic";
+        final int partition = 3;
+        final byte[] payload = new byte[] {1, 2, 3, 4};
+
+        final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+                new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
+                new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
+                new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
+        data.put(new TopicPartition(topic, partition), records);
+
+        final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
+
+        // ----- the test consumer -----
+
+        final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+        when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+            @Override
+            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+                return consumerRecords;
+            }
+        });
+
+        whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+        // ----- build a fetcher -----
+
+        BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
+        List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
+        KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+                sourceContext,
+                topics,
+                null, /* periodic watermark extractor */
+                null, /* punctuated watermark extractor */
+                new TestProcessingTimeService(),
+                10, /* watermark interval */
+                this.getClass().getClassLoader(),
+                true, /* checkpointing */
+                "task_name",
+                new UnregisteredMetricsGroup(),
+                schema,
+                new Properties(),
+                0L,
+                false);
+
+
+        // ----- run the fetcher -----
+
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final Thread fetcherRunner = new Thread("fetcher runner") {
+
+            @Override
+            public void run() {
+                try {
+                    fetcher.runFetchLoop();
+                } catch (Throwable t) {
+                    error.set(t);
+                }
+            }
+        };
+        fetcherRunner.start();
+
+        // wait until the thread started to emit records to the source context
+        sourceContext.waitTillHasBlocker();
+
+        // now we try to cancel the fetcher, including the interruption usually done on the task thread
+        // once it has finished, there must be no more thread blocked on the source context
+        fetcher.cancel();
+        fetcherRunner.interrupt();
+        fetcherRunner.join();
+
+        assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
+    }
+
+    // ------------------------------------------------------------------------
+    //  test utilities
+    // ------------------------------------------------------------------------
+
+    private static final class BlockingSourceContext<T> implements SourceContext<T> {
+
+        private final ReentrantLock lock = new ReentrantLock();
+        private final OneShotLatch inBlocking = new OneShotLatch();
+
+        @Override
+        public void collect(T element) {
+            block();
+        }
+
+        @Override
+        public void collectWithTimestamp(T element, long timestamp) {
+            block();
+        }
+
+        @Override
+        public void emitWatermark(Watermark mark) {
+            block();
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return new Object();
+        }
+
+        @Override
+        public void close() {}
+
+        public void waitTillHasBlocker() throws InterruptedException {
+            inBlocking.await();
+        }
+
+        public boolean isStillBlocking() {
+            return lock.isLocked();
+        }
+
+        @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
+        private void block() {
+            lock.lock();
+            try {
+                inBlocking.trigger();
+
+                // put this thread to sleep indefinitely
+                final Object o = new Object();
+                while (true) {
+                    synchronized (o) {
+                        o.wait();
+                    }
+                }
+            }
+            catch (InterruptedException e) {
+                // exit cleanly, simply reset the interruption flag
+                Thread.currentThread().interrupt();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
new file mode 100644
index 0000000..08511c9
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+
+public class Kafka010ITCase extends KafkaConsumerTestBase {
+
+	// ------------------------------------------------------------------------
+	//  Suite of Tests
+	// ------------------------------------------------------------------------
+
+
+	@Test(timeout = 60000)
+	public void testFailOnNoBroker() throws Exception {
+		runFailOnNoBrokerTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testConcurrentProducerConsumerTopology() throws Exception {
+		runSimpleConcurrentProducerConsumerTopology();
+	}
+
+	@Test(timeout = 60000)
+	public void testKeyValueSupport() throws Exception {
+		runKeyValueTest();
+	}
+
+	// --- canceling / failures ---
+
+	@Test(timeout = 60000)
+	public void testCancelingEmptyTopic() throws Exception {
+		runCancelingOnEmptyInputTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testCancelingFullTopic() throws Exception {
+		runCancelingOnFullInputTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testFailOnDeploy() throws Exception {
+		runFailOnDeployTest();
+	}
+
+
+	// --- source to partition mappings and exactly once ---
+
+	@Test(timeout = 60000)
+	public void testOneToOneSources() throws Exception {
+		runOneToOneExactlyOnceTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testOneSourceMultiplePartitions() throws Exception {
+		runOneSourceMultiplePartitionsExactlyOnceTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testMultipleSourcesOnePartition() throws Exception {
+		runMultipleSourcesOnePartitionExactlyOnceTest();
+	}
+
+	// --- broker failure ---
+
+	@Test(timeout = 60000)
+	public void testBrokerFailure() throws Exception {
+		runBrokerFailureTest();
+	}
+
+	// --- special executions ---
+
+	@Test(timeout = 60000)
+	public void testBigRecordJob() throws Exception {
+		runBigRecordTestTopology();
+	}
+
+	@Test(timeout = 60000)
+	public void testMultipleTopics() throws Exception {
+		runProduceConsumeMultipleTopics();
+	}
+
+	@Test(timeout = 60000)
+	public void testAllDeletes() throws Exception {
+		runAllDeletesTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testMetricsAndEndOfStream() throws Exception {
+		runEndOfStreamTest();
+	}
+
+	// --- offset committing ---
+
+	@Test(timeout = 60000)
+	public void testCommitOffsetsToKafka() throws Exception {
+		runCommitOffsetsToKafka();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromKafkaCommitOffsets() throws Exception {
+		runStartFromKafkaCommitOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+		runAutoOffsetRetrievalAndCommitToKafka();
+	}
+
+	/**
+	 * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka
+	 */
+	@Test(timeout = 60000)
+	public void testTimestamps() throws Exception {
+
+		final String topic = "tstopic";
+		createTestTopic(topic, 3, 1);
+
+		// ---------- Produce an event time stream into Kafka -------------------
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(1);
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env.getConfig().disableSysoutLogging();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() {
+			boolean running = true;
+
+			@Override
+			public void run(SourceContext<Long> ctx) throws Exception {
+				long i = 0;
+				while(running) {
+					ctx.collectWithTimestamp(i, i*2);
+					if(i++ == 1000L) {
+						running = false;
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		});
+
+		final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig());
+		FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new KafkaPartitioner<Long>() {
+			@Override
+			public int partition(Long next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+				return (int)(next % 3);
+			}
+		});
+		prod.setParallelism(3);
+		prod.setWriteTimestampToKafka(true);
+		env.execute("Produce some");
+
+		// ---------- Consume stream from Kafka -------------------
+
+		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(1);
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env.getConfig().disableSysoutLogging();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		FlinkKafkaConsumer010<Long> kafkaSource = new FlinkKafkaConsumer010<>(topic, new LimitedLongDeserializer(), standardProps);
+		kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() {
+			@Nullable
+			@Override
+			public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
+				if(lastElement % 10 == 0) {
+					return new Watermark(lastElement);
+				}
+				return null;
+			}
+
+			@Override
+			public long extractTimestamp(Long element, long previousElementTimestamp) {
+				return previousElementTimestamp;
+			}
+		});
+
+		DataStream<Long> stream = env.addSource(kafkaSource);
+		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+		stream.transform("timestamp validating operator", objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1);
+
+		env.execute("Consume again");
+
+		deleteTestTopic(topic);
+	}
+
+	private static class TimestampValidatingOperator extends StreamSink<Long> {
+
+		public TimestampValidatingOperator() {
+			super(new SinkFunction<Long>() {
+				@Override
+				public void invoke(Long value) throws Exception {
+					throw new RuntimeException("Unexpected");
+				}
+			});
+		}
+
+		long elCount = 0;
+		long wmCount = 0;
+		long lastWM = Long.MIN_VALUE;
+
+		@Override
+		public void processElement(StreamRecord<Long> element) throws Exception {
+			elCount++;
+			if(element.getValue() * 2 != element.getTimestamp()) {
+				throw new RuntimeException("Invalid timestamp: " + element);
+			}
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			wmCount++;
+
+			if(lastWM <= mark.getTimestamp()) {
+				lastWM = mark.getTimestamp();
+			} else {
+				throw new RuntimeException("Received watermark higher than the last one");
+			}
+
+			if( mark.getTimestamp() % 10 != 0 && mark.getTimestamp() != Long.MAX_VALUE ) {
+				throw new RuntimeException("Invalid watermark: " + mark.getTimestamp());
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			if(elCount != 1000L) {
+				throw new RuntimeException("Wrong final element count " + elCount);
+			}
+
+			if(wmCount <= 2) {
+				throw new RuntimeException("Almost no watermarks have been sent " + wmCount);
+			}
+		}
+	}
+
+	private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> {
+
+		private final TypeInformation<Long> ti;
+		private final TypeSerializer<Long> ser;
+		long cnt = 0;
+
+		public LimitedLongDeserializer() {
+			this.ti = TypeInfoParser.parse("Long");
+			this.ser = ti.createSerializer(new ExecutionConfig());
+		}
+		@Override
+		public TypeInformation<Long> getProducedType() {
+			return ti;
+		}
+
+		@Override
+		public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+			cnt++;
+			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+			Long e = ser.deserialize(in);
+			return e;
+		}
+
+		@Override
+		public boolean isEndOfStream(Long nextElement) {
+			return cnt > 1000L;
+		}
+	}
+
+}