You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/10/11 08:05:12 UTC

[2/3] flink git commit: [FLINK-4035] Refactor the Kafka 0.10 connector to be based upon the 0.9 connector

[FLINK-4035] Refactor the Kafka 0.10 connector to be based upon the 0.9 connector

Add a test case for Kafka's new timestamp functionality and update the documentation.

This closes #2369


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6731ec1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6731ec1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6731ec1e

Branch: refs/heads/master
Commit: 6731ec1e48d0a0092dd2330adda73bcf37fda8d7
Parents: 63859c6
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Aug 9 16:38:21 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Oct 11 10:04:25 2016 +0200

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |  67 +++-
 docs/page/js/flink.js                           |   3 +-
 .../flink-connector-kafka-0.10/pom.xml          |  50 +--
 .../connectors/kafka/FlinkKafkaConsumer010.java | 121 +------
 .../connectors/kafka/FlinkKafkaProducer010.java | 315 +++++++++++++++++--
 .../kafka/Kafka010JsonTableSource.java          |   2 +-
 .../connectors/kafka/Kafka010TableSource.java   |   2 +-
 .../kafka/internal/Kafka010Fetcher.java         | 268 ++--------------
 .../connectors/kafka/Kafka010ITCase.java        | 266 +++++++++++-----
 .../connectors/kafka/KafkaProducerTest.java     | 119 -------
 .../kafka/KafkaTestEnvironmentImpl.java         |  80 ++++-
 .../kafka/internals/SimpleConsumerThread.java   |   2 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |   7 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java  |   4 +-
 .../connectors/kafka/FlinkKafkaProducer09.java  |   2 +-
 .../kafka/internal/Kafka09Fetcher.java          |  22 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |   6 +-
 .../kafka/FlinkKafkaConsumerBase.java           |   4 +
 .../kafka/FlinkKafkaProducerBase.java           |   4 +-
 .../kafka/internals/AbstractFetcher.java        |  43 +--
 ...picPartitionStateWithPeriodicWatermarks.java |   4 +-
 ...cPartitionStateWithPunctuatedWatermarks.java |   4 +-
 .../connectors/kafka/KafkaConsumerTestBase.java | 201 ++++++------
 .../connectors/kafka/KafkaProducerTestBase.java |   5 +-
 .../kafka/KafkaShortRetentionTestBase.java      |   4 +-
 .../connectors/kafka/KafkaTestEnvironment.java  |   7 +-
 .../AbstractFetcherTimestampsTest.java          |  68 ++--
 .../kafka/testutils/DataGenerators.java         |  87 ++---
 .../testutils/JobManagerCommunicationUtils.java |  21 +-
 29 files changed, 936 insertions(+), 852 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index d2221fa..9a360d4 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -46,14 +46,6 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is
   </thead>
   <tbody>
     <tr>
-        <td>flink-connector-kafka</td>
-        <td>0.9.1, 0.10</td>
-        <td>FlinkKafkaConsumer082<br>
-        FlinkKafkaProducer</td>
-        <td>0.8.x</td>
-        <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>
-    </tr>
-     <tr>
         <td>flink-connector-kafka-0.8{{ site.scala_version_suffix }}</td>
         <td>1.0.0</td>
         <td>FlinkKafkaConsumer08<br>
@@ -61,7 +53,7 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is
         <td>0.8.x</td>
         <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>
     </tr>
-     <tr>
+    <tr>
         <td>flink-connector-kafka-0.9{{ site.scala_version_suffix }}</td>
         <td>1.0.0</td>
         <td>FlinkKafkaConsumer09<br>
@@ -69,6 +61,14 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is
         <td>0.9.x</td>
         <td>Uses the new <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Consumer API</a> Kafka.</td>
     </tr>
+    <tr>
+        <td>flink-connector-kafka-0.10{{ site.scala_version_suffix }}</td>
+        <td>1.2.0</td>
+        <td>FlinkKafkaConsumer010<br>
+        FlinkKafkaProducer010</td>
+        <td>0.10.x</td>
+        <td>This connector supports <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message">Kafka messages with timestamps</a> both for producing and consuming.</td>
+    </tr>
   </tbody>
 </table>
 
@@ -87,7 +87,6 @@ Note that the streaming connectors are currently not part of the binary distribu
 ### Installing Apache Kafka
 
 * Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
-* On 32 bit computers [this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in) problem may occur.
 * If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name` setting in the `config/server.properties` file must be set to the machine's IP address.
 
 ### Kafka Consumer
@@ -256,17 +255,28 @@ records to partitions.
 
 Example:
 
+
 <div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+<div data-lang="java, Kafka 0.8+" markdown="1">
 {% highlight java %}
 stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
 {% endhighlight %}
 </div>
-<div data-lang="scala" markdown="1">
+<div data-lang="java, Kafka 0.10+" markdown="1">
+{% highlight java %}
+FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
+{% endhighlight %}
+</div>
+<div data-lang="scala, Kafka 0.8+" markdown="1">
 {% highlight scala %}
 stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
 {% endhighlight %}
 </div>
+<div data-lang="scala, Kafka 0.10+" markdown="1">
+{% highlight scala %}
+FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
+{% endhighlight %}
+</div>
 </div>
 
 You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
@@ -287,3 +297,36 @@ higher value.
 
 There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery
 into a Kafka topic.
+
+### Using Kafka timestamps and Flink event time in Kafka 0.10
+
+Since Apache Kafka 0.10., Kafka's messages can carry [timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message), indicating
+the time the event has occurred (see ["event time" in Apache Flink](../event_time.html)) or the time when the message
+has been written to the Kafka broker.
+
+The `FlinkKafkaConsumer010` will emit records with the timestamp attached, if the time characteristic in Flink is 
+set to `TimeCharacteristic.EventTime` (`StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`).
+
+The Kafka consumer does not emit watermarks. To emit watermarks, the same mechanisms as described above in 
+"Kafka Consumers and Timestamp Extraction/Watermark Emission"  using the `assignTimestampsAndWatermarks` method are applicable.
+
+There is no need to define a timestamp extractor when using the timestamps from Kafka. The `previousElementTimestamp` argument of 
+the `extractTimestamp()` method contains the timestamp carried by the Kafka message.
+
+A timestamp extractor for a Kafka consumer would look like this:
+{% highlight java %}
+public long extractTimestamp(Long element, long previousElementTimestamp) {
+    return previousElementTimestamp;
+}
+{% endhighlight %}
+
+
+
+The `FlinkKafkaProducer010` only emits the record timestamp, if `setWriteTimestampToKafka(true)` is set.
+
+{% highlight java %}
+FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
+config.setWriteTimestampToKafka(true);
+{% endhighlight %}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/docs/page/js/flink.js
----------------------------------------------------------------------
diff --git a/docs/page/js/flink.js b/docs/page/js/flink.js
index fdf972c..885a8ff 100644
--- a/docs/page/js/flink.js
+++ b/docs/page/js/flink.js
@@ -42,6 +42,7 @@ function codeTabs() {
       var image = $(this).data("image");
       var notabs = $(this).data("notabs");
       var capitalizedLang = lang.substr(0, 1).toUpperCase() + lang.substr(1);
+      lang = lang.replace(/[^a-zA-Z0-9]/g, "_");
       var id = "tab_" + lang + "_" + counter;
       $(this).attr("id", id);
       if (image != null && langImages[lang]) {
@@ -99,9 +100,7 @@ function viewSolution() {
 // A script to fix internal hash links because we have an overlapping top bar.
 // Based on https://github.com/twitter/bootstrap/issues/193#issuecomment-2281510
 function maybeScrollToHash() {
-  console.log("HERE");
   if (window.location.hash && $(window.location.hash).length) {
-    console.log("HERE2", $(window.location.hash), $(window.location.hash).offset().top);
     var newTop = $(window.location.hash).offset().top - 57;
     $(window).scrollTop(newTop);
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
index f2bcb11..0b426b5 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
@@ -26,7 +26,7 @@ under the License.
 	<parent>
 		<groupId>org.apache.flink</groupId>
 		<artifactId>flink-streaming-connectors</artifactId>
-		<version>1.1-SNAPSHOT</version>
+		<version>1.2-SNAPSHOT</version>
 		<relativePath>..</relativePath>
 	</parent>
 
@@ -37,7 +37,7 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<kafka.version>0.10.0.0</kafka.version>
+		<kafka.version>0.10.0.1</kafka.version>
 	</properties>
 
 	<dependencies>
@@ -46,21 +46,16 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
 			<version>${project.version}</version>
-			<scope>provided</scope>
 		</dependency>
 
+		<!-- Add Kafka 0.10.x as a dependency -->
+
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-base_2.10</artifactId>
-			<version>${project.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.apache.kafka</groupId>
-					<artifactId>kafka_${scala.binary.version}</artifactId>
-				</exclusion>
-			</exclusions>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+			<version>${kafka.version}</version>
 		</dependency>
 
 		<dependency>
@@ -73,20 +68,29 @@ under the License.
 			<optional>true</optional>
 		</dependency>
 
+		<!-- test dependencies -->
+
 		<dependency>
-			<groupId>org.apache.kafka</groupId>
-			<artifactId>kafka-clients</artifactId>
-			<version>${kafka.version}</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<!-- exclude Kafka dependencies -->
+				<exclusion>
+					<groupId>org.apache.kafka</groupId>
+					<artifactId>kafka_${scala.binary.version}</artifactId>
+				</exclusion>
+			</exclusions>
+			<type>test-jar</type>
+			<scope>test</scope>
 		</dependency>
 
-		<!-- test dependencies -->
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-kafka-base_2.10</artifactId>
 			<version>${project.version}</version>
 			<exclusions>
-				<!-- exclude 0.8 dependencies -->
+				<!-- exclude Kafka dependencies -->
 				<exclusion>
 					<groupId>org.apache.kafka</groupId>
 					<artifactId>kafka_${scala.binary.version}</artifactId>
@@ -127,6 +131,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-jmx</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 78ccd4a..267ff57 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -28,20 +28,10 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.util.SerializedValue;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
@@ -64,30 +54,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * is constructed. That means that the client that submits the program needs to be able to
  * reach the Kafka brokers or ZooKeeper.</p>
  */
-public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
+public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 
 	private static final long serialVersionUID = 2324564345203409112L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer010.class);
-
-	/**  Configuration key to change the polling timeout **/
-	public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
-
-	/** Boolean configuration key to disable metrics tracking **/
-	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
-
-	/** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
-	 * available. If 0, returns immediately with any records that are available now. */
-	public static final long DEFAULT_POLL_TIMEOUT = 100L;
-
-	// ------------------------------------------------------------------------
-
-	/** User-supplied properties for Kafka **/
-	private final Properties properties;
-
-	/** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
-	 * available. If 0, returns immediately with any records that are available now */
-	private final long pollTimeout;
 
 	// ------------------------------------------------------------------------
 
@@ -151,51 +121,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
 	 *           The properties that are used to configure both the fetcher and the offset handler.
 	 */
 	public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
-		super(deserializer);
-
-		checkNotNull(topics, "topics");
-		this.properties = checkNotNull(props, "props");
-		setDeserializer(this.properties);
-
-		// configure the polling timeout
-		try {
-			if (properties.containsKey(KEY_POLL_TIMEOUT)) {
-				this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
-			} else {
-				this.pollTimeout = DEFAULT_POLL_TIMEOUT;
-			}
-		}
-		catch (Exception e) {
-			throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
-		}
-
-		// read the partitions that belong to the listed topics
-		final List<KafkaTopicPartition> partitions = new ArrayList<>();
-
-		try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(this.properties)) {
-			for (final String topic: topics) {
-				// get partitions for each topic
-				List<PartitionInfo> partitionsForTopic = consumer.partitionsFor(topic);
-				// for non existing topics, the list might be null.
-				if (partitionsForTopic != null) {
-					partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
-				}
-			}
-		}
-
-		if (partitions.isEmpty()) {
-			throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics);
-		}
-
-		// we now have a list of partitions which is the same for all parallel consumer instances.
-		LOG.info("Got {} partitions from these topics: {}", partitions.size(), topics);
-
-		if (LOG.isInfoEnabled()) {
-			logPartitionInfo(LOG, partitions);
-		}
-
-		// register these partitions
-		setSubscribedPartitions(partitions);
+		super(topics, deserializer, props);
 	}
 
 	@Override
@@ -212,48 +138,5 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
 				watermarksPeriodic, watermarksPunctuated,
 				runtimeContext, deserializer,
 				properties, pollTimeout, useMetrics);
-		
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities 
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Converts a list of Kafka PartitionInfo's to Flink's KafkaTopicPartition (which are serializable)
-	 * 
-	 * @param partitions A list of Kafka PartitionInfos.
-	 * @return A list of KafkaTopicPartitions
-	 */
-	private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
-		checkNotNull(partitions);
-
-		List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size());
-		for (PartitionInfo pi : partitions) {
-			ret.add(new KafkaTopicPartition(pi.topic(), pi.partition()));
-		}
-		return ret;
-	}
-
-	/**
-	 * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties.
-	 * 
-	 * @param props The Kafka properties to register the serializer in.
-	 */
-	private static void setDeserializer(Properties props) {
-		final String deSerName = ByteArrayDeserializer.class.getCanonicalName();
-
-		Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-		Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-
-		if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
-			LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-		}
-		if (valDeSer != null && !valDeSer.equals(deSerName)) {
-			LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-		}
-
-		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
-		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 49bce39..cc0194b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -17,27 +17,123 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.kafka.clients.producer.ProducerRecord;
 
 import java.util.Properties;
 
+import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
+
 
 /**
- * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8.
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x
+ *
+ * Implementation note: This producer is a hybrid between a regular regular sink function (a)
+ * and a custom operator (b).
+ *
+ * For (a), the class implements the SinkFunction and RichFunction interfaces.
+ * For (b), it extends the StreamTask class.
+ *
+ * Details about approach (a):
  *
- * Please note that this producer does not have any reliability guarantees.
+ *  Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the
+ *  DataStream.addSink() method.
+ *  Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record
+ *  the Kafka 0.10 producer has a second invocation option, approach (b).
  *
- * @param <IN> Type of the messages to write into Kafka.
+ * Details about approach (b):
+ *  Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the
+ *  FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer
+ *  can access the internal record timestamp of the record and write it to Kafka.
+ *
+ * All methods and constructors in this class are marked with the approach they are needed for.
  */
-public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> {
+public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction {
+
+	/**
+	 * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
+	 */
+	private boolean writeTimestampToKafka = false;
+
+	// ---------------------- "Constructors" for timestamp writing ------------------
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 * @param inStream The stream to write to Kafka
+	 * @param topicId ID of the Kafka topic.
+	 * @param serializationSchema User defined serialization schema supporting key/value messages
+	 * @param producerConfig Properties with the producer configuration.
+	 */
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					KeyedSerializationSchema<T> serializationSchema,
+																					Properties producerConfig) {
+		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FixedPartitioner<T>());
+	}
+
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 * @param inStream The stream to write to Kafka
+	 * @param topicId ID of the Kafka topic.
+	 * @param serializationSchema User defined (keyless) serialization schema.
+	 * @param producerConfig Properties with the producer configuration.
+	 */
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					SerializationSchema<T> serializationSchema,
+																					Properties producerConfig) {
+		return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 *  @param inStream The stream to write to Kafka
+	 *  @param topicId The name of the target topic
+	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 */
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					KeyedSerializationSchema<T> serializationSchema,
+																					Properties producerConfig,
+																					KafkaPartitioner<T> customPartitioner) {
 
-	private static final long serialVersionUID = 1L;
+		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+		FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
+		SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+		return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+	}
 
-	// ------------------- Keyless serialization schema constructors ----------------------
+	// ---------------------- Regular constructors w/o timestamp support  ------------------
 
 	/**
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
@@ -50,8 +146,8 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param serializationSchema
 	 * 			User defined (keyless) serialization schema.
 	 */
-	public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+	public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
 	}
 
 	/**
@@ -65,8 +161,8 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
-	public FlinkKafkaProducer010(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());
+	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>());
 	}
 
 	/**
@@ -78,9 +174,8 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
 	 */
-	public FlinkKafkaProducer010(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
-
 	}
 
 	// ------------------- Key/Value serialization schema constructors ----------------------
@@ -96,8 +191,8 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param serializationSchema
 	 * 			User defined serialization schema supporting key/value messages
 	 */
-	public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
-		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+	public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) {
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
 	}
 
 	/**
@@ -111,27 +206,193 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
-	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>());
+	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
+		this(topicId, serializationSchema, producerConfig, new FixedPartitioner<T>());
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
+	 * Create Kafka producer
 	 *
-	 * @param topicId The topic to write data to
-	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
 	 */
-	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
-		super(topicId, serializationSchema, producerConfig, customPartitioner);
+	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
+		// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
+		// invoke call.
+		super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
+	}
+
+
+	// ----------------------------- Generic element processing  ---------------------------
+
+	private void invokeInternal(T next, long elementTimestamp) throws Exception {
+
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+
+		internalProducer.checkErroneous();
+
+		byte[] serializedKey = internalProducer.schema.serializeKey(next);
+		byte[] serializedValue = internalProducer.schema.serializeValue(next);
+		String targetTopic = internalProducer.schema.getTargetTopic(next);
+		if (targetTopic == null) {
+			targetTopic = internalProducer.defaultTopicId;
+		}
+
+		Long timestamp = null;
+		if(this.writeTimestampToKafka) {
+			timestamp = elementTimestamp;
+		}
+
+		ProducerRecord<byte[], byte[]> record;
+		if (internalProducer.partitioner == null) {
+			record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
+		} else {
+			record = new ProducerRecord<>(targetTopic, internalProducer.partitioner.partition(next, serializedKey, serializedValue, internalProducer.partitions.length), timestamp, serializedKey, serializedValue);
+		}
+		if (internalProducer.flushOnCheckpoint) {
+			synchronized (internalProducer.pendingRecordsLock) {
+				internalProducer.pendingRecords++;
+			}
+		}
+		internalProducer.producer.send(record, internalProducer.callback);
 	}
 
+
+	// ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ----
+
+
+	// ---- Configuration setters
+
+	/**
+	 * Defines whether the producer should fail on errors, or only log them.
+	 * If this is set to true, then exceptions will be only logged, if set to false,
+	 * exceptions will be eventually thrown and cause the streaming program to
+	 * fail (and enter recovery).
+	 *
+	 * Method is only accessible for approach (a) (see above)
+	 *
+	 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+	 */
+	public void setLogFailuresOnly(boolean logFailuresOnly) {
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+		internalProducer.setLogFailuresOnly(logFailuresOnly);
+	}
+
+	/**
+	 * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
+	 * to be acknowledged by the Kafka producer on a checkpoint.
+	 * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
+	 *
+	 * Method is only accessible for approach (a) (see above)
+	 *
+	 * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
+	 */
+	public void setFlushOnCheckpoint(boolean flush) {
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+		internalProducer.setFlushOnCheckpoint(flush);
+	}
+
+	/**
+	 * This method is used for approach (a) (see above)
+	 *
+	 */
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+		internalProducer.open(parameters);
+	}
+
+	/**
+	 * This method is used for approach (a) (see above)
+	 */
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+		return internalProducer.getIterationRuntimeContext();
+	}
+
+	/**
+	 * This method is used for approach (a) (see above)
+	 */
 	@Override
-	protected void flush() {
-		if (this.producer != null) {
-			producer.flush();
+	public void setRuntimeContext(RuntimeContext t) {
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+		internalProducer.setRuntimeContext(t);
+	}
+
+	/**
+	 * Invoke method for using the Sink as DataStream.addSink() sink.
+	 *
+	 * This method is used for approach (a) (see above)
+	 *
+	 * @param value The input record.
+	 */
+	@Override
+	public void invoke(T value) throws Exception {
+		invokeInternal(value, Long.MAX_VALUE);
+	}
+
+
+	// ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ----
+
+
+	/**
+	 * Process method for using the sink with timestamp support.
+	 *
+	 * This method is used for approach (b) (see above)
+	 */
+	@Override
+	public void processElement(StreamRecord<T> element) throws Exception {
+		invokeInternal(element.getValue(), element.getTimestamp());
+	}
+
+	/**
+	 * Configuration object returned by the writeToKafkaWithTimestamps() call.
+	 */
+	public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {
+
+		private final FlinkKafkaProducerBase wrappedProducerBase;
+		private final FlinkKafkaProducer010 producer;
+
+		private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) {
+			//noinspection unchecked
+			super(stream, producer);
+			this.producer = producer;
+			this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction;
+		}
+
+		/**
+		 * Defines whether the producer should fail on errors, or only log them.
+		 * If this is set to true, then exceptions will be only logged, if set to false,
+		 * exceptions will be eventually thrown and cause the streaming program to
+		 * fail (and enter recovery).
+		 *
+		 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+		 */
+		public void setLogFailuresOnly(boolean logFailuresOnly) {
+			this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly);
+		}
+
+		/**
+		 * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
+		 * to be acknowledged by the Kafka producer on a checkpoint.
+		 * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
+		 *
+		 * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
+		 */
+		public void setFlushOnCheckpoint(boolean flush) {
+			this.wrappedProducerBase.setFlushOnCheckpoint(flush);
+		}
+
+		/**
+		 * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
+		 * Timestamps must be positive for Kafka to accept them.
+		 *
+		 * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
+		 */
+		public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+			this.producer.writeTimestampToKafka = writeTimestampToKafka;
 		}
 	}
+
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index cda68ce..ddf1ad3 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -28,7 +28,7 @@ import java.util.Properties;
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.10.
  */
-public class Kafka010JsonTableSource extends KafkaJsonTableSource {
+public class Kafka010JsonTableSource extends Kafka09JsonTableSource {
 
 	/**
 	 * Creates a Kafka 0.10 JSON {@link StreamTableSource}.

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index cee1b90..732440b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -28,7 +28,7 @@ import java.util.Properties;
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.10.
  */
-public class Kafka010TableSource extends KafkaTableSource {
+public class Kafka010TableSource extends Kafka09TableSource {
 
 	/**
 	 * Creates a Kafka 0.10 {@link StreamTableSource}.

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 70f530b..47bee22 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -18,37 +18,20 @@
 
 package org.apache.flink.streaming.connectors.kafka.internal;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -56,40 +39,7 @@ import java.util.Properties;
  * 
  * @param <T> The type of elements produced by the fetcher.
  */
-public class Kafka010Fetcher<T> extends AbstractFetcher<T, TopicPartition> implements Runnable {
-
-	private static final Logger LOG = LoggerFactory.getLogger(Kafka010Fetcher.class);
-
-	// ------------------------------------------------------------------------
-
-	/** The schema to convert between Kafka's byte messages, and Flink's objects */
-	private final KeyedDeserializationSchema<T> deserializer;
-
-	/** The subtask's runtime context */
-	private final RuntimeContext runtimeContext;
-
-	/** The configuration for the Kafka consumer */
-	private final Properties kafkaProperties;
-
-	/** The maximum number of milliseconds to wait for a fetch batch */
-	private final long pollTimeout;
-
-	/** Flag whether to register Kafka metrics as Flink accumulators */
-	private final boolean forwardKafkaMetrics;
-
-	/** Mutex to guard against concurrent access to the non-threadsafe Kafka consumer */
-	private final Object consumerLock = new Object();
-
-	/** Reference to the Kafka consumer, once it is created */
-	private volatile KafkaConsumer<byte[], byte[]> consumer;
-
-	/** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */
-	private volatile ExceptionProxy errorHandler;
-
-	/** Flag to mark the main work loop as alive */
-	private volatile boolean running = true;
-
-	// ------------------------------------------------------------------------
+public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 
 	public Kafka010Fetcher(
 			SourceContext<T> sourceContext,
@@ -100,213 +50,47 @@ public class Kafka010Fetcher<T> extends AbstractFetcher<T, TopicPartition> imple
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long pollTimeout,
-			boolean forwardKafkaMetrics) throws Exception
+			boolean useMetrics) throws Exception
 	{
-		super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext);
-
-		this.deserializer = deserializer;
-		this.runtimeContext = runtimeContext;
-		this.kafkaProperties = kafkaProperties;
-		this.pollTimeout = pollTimeout;
-		this.forwardKafkaMetrics = forwardKafkaMetrics;
-
-		// if checkpointing is enabled, we are not automatically committing to Kafka.
-		kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-				Boolean.toString(!runtimeContext.isCheckpointingEnabled()));
+		super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, deserializer, kafkaProperties, pollTimeout, useMetrics);
 	}
 
-	// ------------------------------------------------------------------------
-	//  Fetcher work methods
-	// ------------------------------------------------------------------------
-
 	@Override
-	public void runFetchLoop() throws Exception {
-		this.errorHandler = new ExceptionProxy(Thread.currentThread());
-
-		// rather than running the main fetch loop directly here, we spawn a dedicated thread
-		// this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code
-		Thread runner = new Thread(this, "Kafka 0.10 Fetcher for " + runtimeContext.getTaskNameWithSubtasks());
-		runner.setDaemon(true);
-		runner.start();
-
-		try {
-			runner.join();
-		} catch (InterruptedException e) {
-			// may be the result of a wake-up after an exception. we ignore this here and only
-			// restore the interruption state
-			Thread.currentThread().interrupt();
-		}
-
-		// make sure we propagate any exception that occurred in the concurrent fetch thread,
-		// before leaving this method
-		this.errorHandler.checkAndThrowException();
+	protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) {
+		consumer.assign(topicPartitions);
 	}
 
 	@Override
-	public void cancel() {
-		// flag the main thread to exit
-		running = false;
-
-		// NOTE:
-		//   - We cannot interrupt the runner thread, because the Kafka consumer may
-		//     deadlock when the thread is interrupted while in certain methods
-		//   - We cannot call close() on the consumer, because it will actually throw
-		//     an exception if a concurrent call is in progress
-
-		// make sure the consumer finds out faster that we are shutting down 
-		if (consumer != null) {
-			consumer.wakeup();
-		}
+	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
+		// get timestamp from provided ConsumerRecord (only possible with kafka 0.10.x)
+		super.emitRecord(record, partition, offset, consumerRecord.timestamp());
 	}
 
+	/**
+	 * Emit record Kafka-timestamp aware.
+	 */
 	@Override
-	public void run() {
-		// This method initializes the KafkaConsumer and guarantees it is torn down properly.
-		// This is important, because the consumer has multi-threading issues,
-		// including concurrent 'close()' calls.
-
-		final KafkaConsumer<byte[], byte[]> consumer;
-		try {
-			consumer = new KafkaConsumer<>(kafkaProperties);
-		}
-		catch (Throwable t) {
-			running = false;
-			errorHandler.reportError(t);
-			return;
-		}
-
-		// from here on, the consumer will be closed properly
-		try {
-			consumer.assign(convertKafkaPartitions(subscribedPartitions()));
-
-			// register Kafka metrics to Flink accumulators
-			if (forwardKafkaMetrics) {
-				Map<MetricName, ? extends Metric> metrics = consumer.metrics();
-				if (metrics == null) {
-					// MapR's Kafka implementation returns null here.
-					LOG.info("Consumer implementation does not support metrics");
-				} else {
-					// we have metrics, register them where possible
-					for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
-						String name = "KafkaConsumer-" + metric.getKey().name();
-						DefaultKafkaMetricAccumulator kafkaAccumulator =
-								DefaultKafkaMetricAccumulator.createFor(metric.getValue());
-
-						// best effort: we only add the accumulator if available.
-						if (kafkaAccumulator != null) {
-							runtimeContext.addAccumulator(name, kafkaAccumulator);
-						}
-					}
-				}
-			}
-
-			// seek the consumer to the initial offsets
-			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
-				if (partition.isOffsetDefined()) {
-					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
-				}
-			}
-
-			// from now on, external operations may call the consumer
-			this.consumer = consumer;
-
-			// main fetch loop
-			while (running) {
-				// get the next batch of records
-				final ConsumerRecords<byte[], byte[]> records;
-				synchronized (consumerLock) {
-					try {
-						records = consumer.poll(pollTimeout);
-					}
-					catch (WakeupException we) {
-						if (running) {
-							throw we;
-						} else {
-							continue;
-						}
-					}
-				}
-
-				// get the records for each topic partition
-				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
-					
-					List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle());
-
-					for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
-						T value = deserializer.deserialize(
-								record.key(), record.value(),
-								record.topic(), record.partition(), record.offset());
-
-						if (deserializer.isEndOfStream(value)) {
-							// end of stream signaled
-							running = false;
-							break;
-						}
-
-						// emit the actual record. this also update offset state atomically
-						// and deals with timestamps and watermark generation
-						emitRecord(value, partition, record.offset());
-					}
-				}
+	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partitionState, long offset, long timestamp) throws Exception {
+		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+			// fast path logic, in case there are no watermarks
+
+			// emit the record, using the checkpoint lock to guarantee
+			// atomicity of record emission and offset state update
+			synchronized (checkpointLock) {
+				sourceContext.collectWithTimestamp(record, timestamp);
+				partitionState.setOffset(offset);
 			}
-			// end main fetch loop
 		}
-		catch (Throwable t) {
-			if (running) {
-				running = false;
-				errorHandler.reportError(t);
-			} else {
-				LOG.debug("Stopped ConsumerThread threw exception", t);
-			}
+		else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+			emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
 		}
-		finally {
-			try {
-				synchronized (consumerLock) {
-					consumer.close();
-				}
-			} catch (Throwable t) {
-				LOG.warn("Error while closing Kafka 0.10 consumer", t);
-			}
+		else {
+			emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
 		}
 	}
 
-	// ------------------------------------------------------------------------
-	//  Kafka 0.10 specific fetcher behavior
-	// ------------------------------------------------------------------------
-
 	@Override
-	public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
-		return new TopicPartition(partition.getTopic(), partition.getPartition());
-	}
-
-	@Override
-	public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
-		KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions();
-		Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);
-
-		for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
-			Long offset = offsets.get(partition.getKafkaTopicPartition());
-			if (offset != null) {
-				offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offset, ""));
-			}
-		}
-
-		if (this.consumer != null) {
-			synchronized (consumerLock) {
-				this.consumer.commitSync(offsetsToCommit);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	public static Collection<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
-		ArrayList<TopicPartition> result = new ArrayList<>(partitions.length);
-		for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
-			result.add(p.getKafkaPartitionHandle());
-		}
-		return result;
+	protected String getFetcherName() {
+		return "Kafka 0.10 Fetcher";
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 5427853..28bf6d5 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -17,14 +17,32 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.junit.Test;
 
-import java.util.UUID;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 
 
 public class Kafka010ITCase extends KafkaConsumerTestBase {
@@ -33,10 +51,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 	//  Suite of Tests
 	// ------------------------------------------------------------------------
 
-	@Override
-	public String getExpectedKafkaVersion() {
-		return "0.10";
-	}
 
 	@Test(timeout = 60000)
 	public void testFailOnNoBroker() throws Exception {
@@ -48,16 +62,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 		runSimpleConcurrentProducerConsumerTopology();
 	}
 
-//	@Test(timeout = 60000)
-//	public void testPunctuatedExplicitWMConsumer() throws Exception {
-//		runExplicitPunctuatedWMgeneratingConsumerTest(false);
-//	}
-
-//	@Test(timeout = 60000)
-//	public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
-//		runExplicitPunctuatedWMgeneratingConsumerTest(true);
-//	}
-
 	@Test(timeout = 60000)
 	public void testKeyValueSupport() throws Exception {
 		runKeyValueTest();
@@ -124,68 +128,168 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 
 	@Test(timeout = 60000)
 	public void testMetricsAndEndOfStream() throws Exception {
-		runMetricsAndEndOfStreamTest();
-	}
-
-	@Test
-	public void testJsonTableSource() throws Exception {
-		String topic = UUID.randomUUID().toString();
-
-		// Names and types are determined in the actual test method of the
-		// base test class.
-		Kafka010JsonTableSource tableSource = new Kafka010JsonTableSource(
-				topic,
-				standardProps,
-				new String[] {
-						"long",
-						"string",
-						"boolean",
-						"double",
-						"missing-field"},
-				new TypeInformation<?>[] {
-						BasicTypeInfo.LONG_TYPE_INFO,
-						BasicTypeInfo.STRING_TYPE_INFO,
-						BasicTypeInfo.BOOLEAN_TYPE_INFO,
-						BasicTypeInfo.DOUBLE_TYPE_INFO,
-						BasicTypeInfo.LONG_TYPE_INFO });
-
-		// Don't fail on missing field, but set to null (default)
-		tableSource.setFailOnMissingField(false);
-
-		runJsonTableSource(topic, tableSource);
-	}
-
-	@Test
-	public void testJsonTableSourceWithFailOnMissingField() throws Exception {
-		String topic = UUID.randomUUID().toString();
-
-		// Names and types are determined in the actual test method of the
-		// base test class.
-		Kafka010JsonTableSource tableSource = new Kafka010JsonTableSource(
-				topic,
-				standardProps,
-				new String[] {
-						"long",
-						"string",
-						"boolean",
-						"double",
-						"missing-field"},
-				new TypeInformation<?>[] {
-						BasicTypeInfo.LONG_TYPE_INFO,
-						BasicTypeInfo.STRING_TYPE_INFO,
-						BasicTypeInfo.BOOLEAN_TYPE_INFO,
-						BasicTypeInfo.DOUBLE_TYPE_INFO,
-						BasicTypeInfo.LONG_TYPE_INFO });
-
-		// Don't fail on missing field, but set to null (default)
-		tableSource.setFailOnMissingField(true);
-
-		try {
-			runJsonTableSource(topic, tableSource);
-			fail("Did not throw expected Exception");
-		} catch (Exception e) {
-			Throwable rootCause = e.getCause().getCause().getCause();
-			assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException);
+		runEndOfStreamTest();
+	}
+
+	/**
+	 * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka
+	 */
+	@Test(timeout = 60000)
+	public void testTimestamps() throws Exception {
+
+		final String topic = "tstopic";
+		createTestTopic(topic, 3, 1);
+
+		// ---------- Produce an event time stream into Kafka -------------------
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(1);
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env.getConfig().disableSysoutLogging();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() {
+			boolean running = true;
+
+			@Override
+			public void run(SourceContext<Long> ctx) throws Exception {
+				long i = 0;
+				while(running) {
+					ctx.collectWithTimestamp(i, i*2);
+					if(i++ == 1000L) {
+						running = false;
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		});
+
+		final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig());
+		FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new KafkaPartitioner<Long>() {
+			@Override
+			public int partition(Long next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+				return (int)(next % 3);
+			}
+		});
+		prod.setParallelism(3);
+		prod.setWriteTimestampToKafka(true);
+		env.execute("Produce some");
+
+		// ---------- Consume stream from Kafka -------------------
+
+		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(1);
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env.getConfig().disableSysoutLogging();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		FlinkKafkaConsumer010<Long> kafkaSource = new FlinkKafkaConsumer010<>(topic, new LimitedLongDeserializer(), standardProps);
+		kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() {
+			@Nullable
+			@Override
+			public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
+				if(lastElement % 10 == 0) {
+					return new Watermark(lastElement);
+				}
+				return null;
+			}
+
+			@Override
+			public long extractTimestamp(Long element, long previousElementTimestamp) {
+				return previousElementTimestamp;
+			}
+		});
+
+		DataStream<Long> stream = env.addSource(kafkaSource);
+		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+		stream.transform("timestamp validating operator", objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1);
+
+		env.execute("Consume again");
+
+		deleteTestTopic(topic);
+	}
+
+	private static class TimestampValidatingOperator extends StreamSink<Long> {
+
+		public TimestampValidatingOperator() {
+			super(new SinkFunction<Long>() {
+				@Override
+				public void invoke(Long value) throws Exception {
+					throw new RuntimeException("Unexpected");
+				}
+			});
+		}
+
+		long elCount = 0;
+		long wmCount = 0;
+		long lastWM = Long.MIN_VALUE;
+
+		@Override
+		public void processElement(StreamRecord<Long> element) throws Exception {
+			elCount++;
+			if(element.getValue() * 2 != element.getTimestamp()) {
+				throw new RuntimeException("Invalid timestamp: " + element);
+			}
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			wmCount++;
+
+			if(lastWM <= mark.getTimestamp()) {
+				lastWM = mark.getTimestamp();
+			} else {
+				throw new RuntimeException("Received watermark higher than the last one");
+			}
+
+			if( mark.getTimestamp() % 10 != 0 && mark.getTimestamp() != Long.MAX_VALUE ) {
+				throw new RuntimeException("Invalid watermark: " + mark.getTimestamp());
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			if(elCount != 1000L) {
+				throw new RuntimeException("Wrong final element count " + elCount);
+			}
+
+			if(wmCount <= 2) {
+				throw new RuntimeException("Almost no watermarks have been sent " + wmCount);
+			}
+		}
+	}
+
+	private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> {
+
+		private final TypeInformation<Long> ti;
+		private final TypeSerializer<Long> ser;
+		long cnt = 0;
+
+		public LimitedLongDeserializer() {
+			this.ti = TypeInfoParser.parse("Long");
+			this.ser = ti.createSerializer(new ExecutionConfig());
+		}
+		@Override
+		public TypeInformation<Long> getProducedType() {
+			return ti;
+		}
+
+		@Override
+		public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+			cnt++;
+			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+			Long e = ser.deserialize(in);
+			return e;
+		}
+
+		@Override
+		public boolean isEndOfStream(Long nextElement) {
+			return cnt > 1000L;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
deleted file mode 100644
index 5f5ac63..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FlinkKafkaProducerBase.class)
-public class KafkaProducerTest extends TestLogger {
-	
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testPropagateExceptions() {
-		try {
-			// mock kafka producer
-			KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
-			
-			// partition setup
-			when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
-					Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
-
-			// failure when trying to send an element
-			when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
-				.thenAnswer(new Answer<Future<RecordMetadata>>() {
-					@Override
-					public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
-						Callback callback = (Callback) invocation.getArguments()[1];
-						callback.onCompletion(null, new Exception("Test error"));
-						return null;
-					}
-				});
-			
-			// make sure the FlinkKafkaProducer instantiates our mock producer
-			whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
-			
-			// (1) producer that propagates errors
-
-			FlinkKafkaProducer010<String> producerPropagating = new FlinkKafkaProducer010<>(
-					"mock_topic", new SimpleStringSchema(), new Properties(), null);
-
-			producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3));
-			producerPropagating.open(new Configuration());
-			
-			try {
-				producerPropagating.invoke("value");
-				producerPropagating.invoke("value");
-				fail("This should fail with an exception");
-			}
-			catch (Exception e) {
-				assertNotNull(e.getCause());
-				assertNotNull(e.getCause().getMessage());
-				assertTrue(e.getCause().getMessage().contains("Test error"));
-			}
-
-			// (2) producer that only logs errors
-
-			FlinkKafkaProducer010<String> producerLogging = new FlinkKafkaProducer010<>(
-					"mock_topic", new SimpleStringSchema(), new Properties(), null);
-			producerLogging.setLogFailuresOnly(true);
-			
-			producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3));
-			producerLogging.open(new Configuration());
-
-			producerLogging.invoke("value");
-			producerLogging.invoke("value");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 45f0478..af6d254 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -28,6 +28,8 @@ import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -64,6 +66,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	private String brokerConnectionString = "";
 	private Properties standardProps;
 	private Properties additionalServerProperties;
+	private boolean secureMode = false;
+	// 6 seconds is default. Seems to be too small for travis. 30 seconds
+	private int zkTimeout = 30000;
 
 	public String getBrokerConnectionString() {
 		return brokerConnectionString;
@@ -75,6 +80,22 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public Properties getSecureProperties() {
+		Properties prop = new Properties();
+		if(secureMode) {
+			prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
+			prop.put("security.protocol", "SASL_PLAINTEXT");
+			prop.put("sasl.kerberos.service.name", "kafka");
+
+			//add special timeout for Travis
+			prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
+			prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
+			prop.setProperty("metadata.fetch.timeout.ms","120000");
+		}
+		return prop;
+	}
+
+	@Override
 	public String getVersion() {
 		return "0.10";
 	}
@@ -90,10 +111,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
-		return prod;
+		return stream.addSink(prod);
+	/*	FlinkKafkaProducer010.FlinkKafkaProducer010Configuration<T> sink = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props, partitioner);
+		sink.setFlushOnCheckpoint(true);
+		return sink; */
 	}
 
 	@Override
@@ -130,8 +154,21 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(int numKafkaServers, Properties additionalServerProperties) {
+	public boolean isSecureRunSupported() {
+		return true;
+	}
+
+	@Override
+	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
+		//increase the timeout since in Travis ZK connection takes long time for secure connection.
+		if(secureMode) {
+			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
+			numKafkaServers = 1;
+			zkTimeout = zkTimeout * 15;
+		}
+
 		this.additionalServerProperties = additionalServerProperties;
+		this.secureMode = secureMode;
 		File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
 		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
@@ -151,9 +188,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		brokers = null;
 
 		try {
-			LOG.info("Starting Zookeeper");
-			zookeeper = new TestingServer(-1, tmpZkDir);
+			zookeeper = new TestingServer(-	1, tmpZkDir);
 			zookeeperConnectionString = zookeeper.getConnectString();
+			LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
 
 			LOG.info("Starting KafkaServer");
 			brokers = new ArrayList<>(numKafkaServers);
@@ -161,8 +198,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			for (int i = 0; i < numKafkaServers; i++) {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 
-				SocketServer socketServer = brokers.get(i).socketServer();
-				brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+				if(secureMode) {
+					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+				} else {
+					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+				}
 			}
 
 			LOG.info("ZK and KafkaServer started.");
@@ -177,8 +217,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
 		standardProps.setProperty("group.id", "flink-tests");
 		standardProps.setProperty("auto.commit.enable", "false");
-		standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis.
-		standardProps.setProperty("zookeeper.connection.timeout.ms", "30000");
+		standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
+		standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
 		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.10 value)
 		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
 	}
@@ -244,7 +284,14 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		final long deadline = System.currentTimeMillis() + 30000;
 		do {
 			try {
-				Thread.sleep(100);
+				if(secureMode) {
+					//increase wait time since in Travis ZK timeout occurs frequently
+					int wait = zkTimeout / 100;
+					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
+					Thread.sleep(wait);
+				} else {
+					Thread.sleep(100);
+				}
 			} catch (InterruptedException e) {
 				// restore interrupted state
 			}
@@ -295,8 +342,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
 
 		// for CI stability, increase zookeeper session timeout
-		kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
-		kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
+		kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+		kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
 		if(additionalServerProperties != null) {
 			kafkaProperties.putAll(additionalServerProperties);
 		}
@@ -306,6 +353,15 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		for (int i = 1; i <= numTries; i++) {
 			int kafkaPort = NetUtils.getAvailablePort();
 			kafkaProperties.put("port", Integer.toString(kafkaPort));
+
+			//to support secure kafka cluster
+			if(secureMode) {
+				LOG.info("Adding Kafka secure configurations");
+				kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+				kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+				kafkaProperties.putAll(getSecureProperties());
+			}
+
 			KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
 
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
index 35e491a..1302348 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -376,7 +376,7 @@ class SimpleConsumerThread<T> extends Thread {
 								continue partitionsLoop;
 							}
 							
-							owner.emitRecord(value, currentPartition, offset);
+							owner.emitRecord(value, currentPartition, offset, Long.MIN_VALUE);
 						}
 						else {
 							// no longer running

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index cbf3d06..a0d5002 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -31,6 +31,9 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
 import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -101,10 +104,10 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
-		return prod;
+		return stream.addSink(prod);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 9708777..a97476a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -81,11 +81,11 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 	// ------------------------------------------------------------------------
 
 	/** User-supplied properties for Kafka **/
-	private final Properties properties;
+	protected final Properties properties;
 
 	/** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
 	 * available. If 0, returns immediately with any records that are available now */
-	private final long pollTimeout;
+	protected final long pollTimeout;
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index eb3440a..2a3e39d 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -27,7 +27,7 @@ import java.util.Properties;
 
 
 /**
- * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8.
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.9.
  *
  * Please note that this producer does not have any reliability guarantees.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index aaec9dc..37e40fc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -131,7 +131,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 
 		// rather than running the main fetch loop directly here, we spawn a dedicated thread
 		// this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code
-		Thread runner = new Thread(this, "Kafka 0.9 Fetcher for " + runtimeContext.getTaskNameWithSubtasks());
+		Thread runner = new Thread(this, getFetcherName() + " for " + runtimeContext.getTaskNameWithSubtasks());
 		runner.setDaemon(true);
 		runner.start();
 
@@ -183,7 +183,8 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 
 		// from here on, the consumer will be closed properly
 		try {
-			consumer.assign(convertKafkaPartitions(subscribedPartitions()));
+			assignPartitionsToConsumer(consumer, convertKafkaPartitions(subscribedPartitions()));
+
 
 			if (useMetrics) {
 				final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
@@ -250,7 +251,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 
 						// emit the actual record. this also update offset state atomically
 						// and deals with timestamps and watermark generation
-						emitRecord(value, partition, record.offset());
+						emitRecord(value, partition, record.offset(), record);
 					}
 				}
 			}
@@ -274,6 +275,21 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 		}
 	}
 
+	// Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method.
+	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
+		emitRecord(record, partition, offset, Long.MIN_VALUE);
+	}
+	/**
+	 * Protected method to make the partition assignment pluggable, for different Kafka versions.
+	 */
+	protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) {
+		consumer.assign(topicPartitions);
+	}
+
+	protected String getFetcherName() {
+		return "Kafka 0.9 Fetcher";
+	}
+
 	// ------------------------------------------------------------------------
 	//  Kafka 0.9 specific fetcher behavior
 	// ------------------------------------------------------------------------