You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:23 UTC

[33/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml
new file mode 100644
index 0000000..58eb043
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -0,0 +1,212 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.2-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-kafka-base_2.10</artifactId>
+	<name>flink-connector-kafka-base</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<kafka.version>0.8.2.2</kafka.version>
+	</properties>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project,
+			won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_${scala.binary.version}</artifactId>
+			<version>${kafka.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-simple</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>net.sf.jopt-simple</groupId>
+					<artifactId>jopt-simple</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.scala-lang</groupId>
+					<artifactId>scala-reflect</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.scala-lang</groupId>
+					<artifactId>scala-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.yammer.metrics</groupId>
+					<artifactId>metrics-annotation</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.xerial.snappy</groupId>
+					<artifactId>snappy-java</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<!-- test dependencies -->
+		
+		<!-- force using the latest zkclient -->
+		<dependency>
+			<groupId>com.101tec</groupId>
+			<artifactId>zkclient</artifactId>
+			<version>0.7</version>
+			<type>jar</type>
+			<scope>test</scope>
+		</dependency>
+
+
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-test</artifactId>
+			<version>${curator.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-jmx</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-minikdc</artifactId>
+			<version>${minikdc.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<dependencyManagement>
+		<dependencies>
+			<dependency>
+				<groupId>com.101tec</groupId>
+				<artifactId>zkclient</artifactId>
+				<version>0.7</version>
+			</dependency>
+		</dependencies>
+	</dependencyManagement>
+	
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+			<plugin>
+				<groupId>org.apache.felix</groupId>
+				<artifactId>maven-bundle-plugin</artifactId>
+				<version>3.0.1</version>
+				<inherited>true</inherited>
+				<extensions>true</extensions>
+			</plugin>
+		</plugins>
+	</build>
+	
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
new file mode 100644
index 0000000..aef7116
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class of all Flink Kafka Consumer data sources.
+ * This implements the common behavior across all Kafka versions.
+ * 
+ * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the
+ * {@link AbstractFetcher}.
+ * 
+ * @param <T> The type of records produced by this data source
+ */
+public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements 
+		CheckpointListener,
+		ResultTypeQueryable<T>,
+		CheckpointedFunction {
+	private static final long serialVersionUID = -6272159445203409112L;
+
+	protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+	
+	/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */
+	public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+	/** Boolean configuration key to disable metrics tracking **/
+	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
+
+	// ------------------------------------------------------------------------
+	//  configuration state, set on the client relevant for all subtasks
+	// ------------------------------------------------------------------------
+
+	private final List<String> topics;
+	
+	/** The schema to convert between Kafka's byte messages, and Flink's objects */
+	protected final KeyedDeserializationSchema<T> deserializer;
+
+	/** The set of topic partitions that the source will read */
+	protected List<KafkaTopicPartition> subscribedPartitions;
+	
+	/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
+	 * to exploit per-partition timestamp characteristics.
+	 * The assigner is kept in serialized form, to deserialize it into multiple copies */
+	private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner;
+	
+	/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
+	 * to exploit per-partition timestamp characteristics. 
+	 * The assigner is kept in serialized form, to deserialize it into multiple copies */
+	private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;
+
+	private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint;
+
+	// ------------------------------------------------------------------------
+	//  runtime state (used individually by each parallel subtask) 
+	// ------------------------------------------------------------------------
+	
+	/** Data for pending but uncommitted offsets */
+	private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+	/** The fetcher implements the connections to the Kafka brokers */
+	private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
+	
+	/** The offsets to restore to, if the consumer restores state from a checkpoint */
+	private transient volatile HashMap<KafkaTopicPartition, Long> restoreToOffset;
+	
+	/** Flag indicating whether the consumer is still running **/
+	private volatile boolean running = true;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Base constructor.
+	 *
+	 * @param deserializer
+	 *           The deserializer to turn raw byte messages into Java/Scala objects.
+	 */
+	public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T> deserializer) {
+		this.topics = checkNotNull(topics);
+		checkArgument(topics.size() > 0, "You have to define at least one topic.");
+		this.deserializer = checkNotNull(deserializer, "valueDeserializer");
+	}
+
+	/**
+	 * This method must be called from the subclasses, to set the list of all subscribed partitions
+	 * that this consumer will fetch from (across all subtasks).
+	 * 
+	 * @param allSubscribedPartitions The list of all partitions that all subtasks together should fetch from.
+	 */
+	protected void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) {
+		checkNotNull(allSubscribedPartitions);
+		this.subscribedPartitions = Collections.unmodifiableList(allSubscribedPartitions);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Configuration
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
+	 * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
+	 * in the same way as in the Flink runtime, when streams are merged.
+	 * 
+	 * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
+	 * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
+	 * characteristics are usually lost that way. For example, if the timestamps are strictly ascending
+	 * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
+	 * parallel source subtask reads more that one partition.
+	 * 
+	 * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
+	 * partition, allows users to let them exploit the per-partition characteristics.
+	 * 
+	 * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
+	 * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
+	 * 
+	 * @param assigner The timestamp assigner / watermark generator to use.
+	 * @return The consumer object, to allow function chaining.   
+	 */
+	public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
+		checkNotNull(assigner);
+		
+		if (this.periodicWatermarkAssigner != null) {
+			throw new IllegalStateException("A periodic watermark emitter has already been set.");
+		}
+		try {
+			ClosureCleaner.clean(assigner, true);
+			this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner);
+			return this;
+		} catch (Exception e) {
+			throw new IllegalArgumentException("The given assigner is not serializable", e);
+		}
+	}
+
+	/**
+	 * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
+	 * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
+	 * in the same way as in the Flink runtime, when streams are merged.
+	 *
+	 * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
+	 * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
+	 * characteristics are usually lost that way. For example, if the timestamps are strictly ascending
+	 * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
+	 * parallel source subtask reads more that one partition.
+	 *
+	 * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
+	 * partition, allows users to let them exploit the per-partition characteristics.
+	 *
+	 * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
+	 * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
+	 *
+	 * @param assigner The timestamp assigner / watermark generator to use.
+	 * @return The consumer object, to allow function chaining.   
+	 */
+	public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
+		checkNotNull(assigner);
+		
+		if (this.punctuatedWatermarkAssigner != null) {
+			throw new IllegalStateException("A punctuated watermark emitter has already been set.");
+		}
+		try {
+			ClosureCleaner.clean(assigner, true);
+			this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
+			return this;
+		} catch (Exception e) {
+			throw new IllegalArgumentException("The given assigner is not serializable", e);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Work methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void run(SourceContext<T> sourceContext) throws Exception {
+		if (subscribedPartitions == null) {
+			throw new Exception("The partitions were not set for the consumer");
+		}
+
+		// we need only do work, if we actually have partitions assigned
+		if (!subscribedPartitions.isEmpty()) {
+
+			// (1) create the fetcher that will communicate with the Kafka brokers
+			final AbstractFetcher<T, ?> fetcher = createFetcher(
+					sourceContext, subscribedPartitions,
+					periodicWatermarkAssigner, punctuatedWatermarkAssigner,
+					(StreamingRuntimeContext) getRuntimeContext());
+
+			// (2) set the fetcher to the restored checkpoint offsets
+			if (restoreToOffset != null) {
+				fetcher.restoreOffsets(restoreToOffset);
+			}
+
+			// publish the reference, for snapshot-, commit-, and cancel calls
+			// IMPORTANT: We can only do that now, because only now will calls to
+			//            the fetchers 'snapshotCurrentState()' method return at least
+			//            the restored offsets
+			this.kafkaFetcher = fetcher;
+			if (!running) {
+				return;
+			}
+			
+			// (3) run the fetcher' main work method
+			fetcher.runFetchLoop();
+		}
+		else {
+			// this source never completes, so emit a Long.MAX_VALUE watermark
+			// to not block watermark forwarding
+			sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
+
+			// wait until this is canceled
+			final Object waitLock = new Object();
+			while (running) {
+				try {
+					//noinspection SynchronizationOnLocalVariableOrMethodParameter
+					synchronized (waitLock) {
+						waitLock.wait();
+					}
+				}
+				catch (InterruptedException e) {
+					if (!running) {
+						// restore the interrupted state, and fall through the loop
+						Thread.currentThread().interrupt();
+					}
+				}
+			}
+		}
+	}
+
+	@Override
+	public void cancel() {
+		// set ourselves as not running
+		running = false;
+		
+		// abort the fetcher, if there is one
+		if (kafkaFetcher != null) {
+			kafkaFetcher.cancel();
+		}
+
+		// there will be an interrupt() call to the main thread anyways
+	}
+
+	@Override
+	public void open(Configuration configuration) {
+		List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
+
+		if (kafkaTopicPartitions != null) {
+			assignTopicPartitions(kafkaTopicPartitions);
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		// pretty much the same logic as cancelling
+		try {
+			cancel();
+		} finally {
+			super.close();
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Checkpoint and restore
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+
+		OperatorStateStore stateStore = context.getOperatorStateStore();
+		offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
+
+		if (context.isRestored()) {
+			restoreToOffset = new HashMap<>();
+			for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
+				restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
+			}
+
+			LOG.info("Setting restore state in the FlinkKafkaConsumer.");
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Using the following offsets: {}", restoreToOffset);
+			}
+		} else {
+			LOG.info("No restore state for FlinkKafkaConsumer.");
+		}
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		if (!running) {
+			LOG.debug("snapshotState() called on closed source");
+		} else {
+
+			offsetsStateForCheckpoint.clear();
+
+			final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+			if (fetcher == null) {
+				// the fetcher has not yet been initialized, which means we need to return the
+				// originally restored offsets or the assigned partitions
+
+				if (restoreToOffset != null) {
+
+					for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
+						offsetsStateForCheckpoint.add(
+								Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
+					}
+				} else if (subscribedPartitions != null) {
+					for (KafkaTopicPartition subscribedPartition : subscribedPartitions) {
+						offsetsStateForCheckpoint.add(
+								Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET));
+					}
+				}
+
+				// the map cannot be asynchronously updated, because only one checkpoint call can happen
+				// on this function at a time: either snapshotState() or notifyCheckpointComplete()
+				pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset);
+			} else {
+				HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
+
+				// the map cannot be asynchronously updated, because only one checkpoint call can happen
+				// on this function at a time: either snapshotState() or notifyCheckpointComplete()
+				pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+
+				for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
+					offsetsStateForCheckpoint.add(
+							Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
+				}
+			}
+
+			// truncate the map of pending offsets to commit, to prevent infinite growth
+			while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+				pendingOffsetsToCommit.remove(0);
+			}
+		}
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		if (!running) {
+			LOG.debug("notifyCheckpointComplete() called on closed source");
+			return;
+		}
+
+		final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+		if (fetcher == null) {
+			LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+			return;
+		}
+		
+		// only one commit operation must be in progress
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
+		}
+
+		try {
+			final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+			if (posInMap == -1) {
+				LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+				return;
+			}
+
+			@SuppressWarnings("unchecked")
+			HashMap<KafkaTopicPartition, Long> offsets =
+					(HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
+
+			// remove older checkpoints in map
+			for (int i = 0; i < posInMap; i++) {
+				pendingOffsetsToCommit.remove(0);
+			}
+
+			if (offsets == null || offsets.size() == 0) {
+				LOG.debug("Checkpoint state was empty.");
+				return;
+			}
+			fetcher.commitInternalOffsetsToKafka(offsets);
+		}
+		catch (Exception e) {
+			if (running) {
+				throw e;
+			}
+			// else ignore exception if we are no longer running
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Kafka Consumer specific methods
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the
+	 * data, and emits it into the data streams.
+	 * 
+	 * @param sourceContext The source context to emit data to.
+	 * @param thisSubtaskPartitions The set of partitions that this subtask should handle.
+	 * @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator.
+	 * @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator.
+	 * @param runtimeContext The task's runtime context.
+	 * 
+	 * @return The instantiated fetcher
+	 * 
+	 * @throws Exception The method should forward exceptions
+	 */
+	protected abstract AbstractFetcher<T, ?> createFetcher(
+			SourceContext<T> sourceContext,
+			List<KafkaTopicPartition> thisSubtaskPartitions,
+			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+			StreamingRuntimeContext runtimeContext) throws Exception;
+
+	protected abstract List<KafkaTopicPartition> getKafkaPartitions(List<String> topics);
+	
+	// ------------------------------------------------------------------------
+	//  ResultTypeQueryable methods 
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return deserializer.getProducedType();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private void assignTopicPartitions(List<KafkaTopicPartition> kafkaTopicPartitions) {
+		subscribedPartitions = new ArrayList<>();
+
+		if (restoreToOffset != null) {
+			for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
+				if (restoreToOffset.containsKey(kafkaTopicPartition)) {
+					subscribedPartitions.add(kafkaTopicPartition);
+				}
+			}
+		} else {
+			Collections.sort(kafkaTopicPartitions, new Comparator<KafkaTopicPartition>() {
+				@Override
+				public int compare(KafkaTopicPartition o1, KafkaTopicPartition o2) {
+					int topicComparison = o1.getTopic().compareTo(o2.getTopic());
+
+					if (topicComparison == 0) {
+						return o1.getPartition() - o2.getPartition();
+					} else {
+						return topicComparison;
+					}
+				}
+			});
+
+			for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) {
+				subscribedPartitions.add(kafkaTopicPartitions.get(i));
+			}
+		}
+	}
+
+	/**
+	 * Selects which of the given partitions should be handled by a specific consumer,
+	 * given a certain number of consumers.
+	 * 
+	 * @param allPartitions The partitions to select from
+	 * @param numConsumers The number of consumers
+	 * @param consumerIndex The index of the specific consumer
+	 * 
+	 * @return The sublist of partitions to be handled by that consumer.
+	 */
+	protected static List<KafkaTopicPartition> assignPartitions(
+			List<KafkaTopicPartition> allPartitions,
+			int numConsumers, int consumerIndex) {
+		final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
+				allPartitions.size() / numConsumers + 1);
+
+		for (int i = 0; i < allPartitions.size(); i++) {
+			if (i % numConsumers == consumerIndex) {
+				thisSubtaskPartitions.add(allPartitions.get(i));
+			}
+		}
+		
+		return thisSubtaskPartitions;
+	}
+	
+	/**
+	 * Logs the partition information in INFO level.
+	 * 
+	 * @param logger The logger to log to.
+	 * @param partitionInfos List of subscribed partitions
+	 */
+	protected static void logPartitionInfo(Logger logger, List<KafkaTopicPartition> partitionInfos) {
+		Map<String, Integer> countPerTopic = new HashMap<>();
+		for (KafkaTopicPartition partition : partitionInfos) {
+			Integer count = countPerTopic.get(partition.getTopic());
+			if (count == null) {
+				count = 1;
+			} else {
+				count++;
+			}
+			countPerTopic.put(partition.getTopic(), count);
+		}
+		StringBuilder sb = new StringBuilder(
+				"Consumer is going to read the following topics (with number of partitions): ");
+		
+		for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) {
+			sb.append(e.getKey()).append(" (").append(e.getValue()).append("), ");
+		}
+		
+		logger.info(sb.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
new file mode 100644
index 0000000..d413f1c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Collections;
+import java.util.Comparator;
+
+import static java.util.Objects.requireNonNull;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic.
+ *
+ * Please note that this producer provides at-least-once reliability guarantees when
+ * checkpoints are enabled and setFlushOnCheckpoint(true) is set.
+ * Otherwise, the producer doesn't provide any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Configuration key for disabling the metrics reporting
+	 */
+	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
+
+	/**
+	 * Array with the partition ids of the given defaultTopicId
+	 * The size of this array is the number of partitions
+	 */
+	protected int[] partitions;
+
+	/**
+	 * User defined properties for the Producer
+	 */
+	protected final Properties producerConfig;
+
+	/**
+	 * The name of the default topic this producer is writing data to
+	 */
+	protected final String defaultTopicId;
+
+	/**
+	 * (Serializable) SerializationSchema for turning objects used with Flink into
+	 * byte[] for Kafka.
+	 */
+	protected final KeyedSerializationSchema<IN> schema;
+
+	/**
+	 * User-provided partitioner for assigning an object to a Kafka partition.
+	 */
+	protected final KafkaPartitioner<IN> partitioner;
+
+	/**
+	 * Flag indicating whether to accept failures (and log them), or to fail on failures
+	 */
+	protected boolean logFailuresOnly;
+
+	/**
+	 * If true, the producer will wait until all outstanding records have been send to the broker.
+	 */
+	protected boolean flushOnCheckpoint;
+	
+	// -------------------------------- Runtime fields ------------------------------------------
+
+	/** KafkaProducer instance */
+	protected transient KafkaProducer<byte[], byte[]> producer;
+
+	/** The callback than handles error propagation or logging callbacks */
+	protected transient Callback callback;
+
+	/** Errors encountered in the async producer are stored here */
+	protected transient volatile Exception asyncException;
+
+	/** Lock for accessing the pending records */
+	protected final SerializableObject pendingRecordsLock = new SerializableObject();
+
+	/** Number of unacknowledged records. */
+	protected long pendingRecords;
+
+	protected OperatorStateStore stateStore;
+
+
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param defaultTopicId The default topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner
+	 */
+	public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		requireNonNull(defaultTopicId, "TopicID not set");
+		requireNonNull(serializationSchema, "serializationSchema not set");
+		requireNonNull(producerConfig, "producerConfig not set");
+		ClosureCleaner.clean(customPartitioner, true);
+		ClosureCleaner.ensureSerializable(serializationSchema);
+
+		this.defaultTopicId = defaultTopicId;
+		this.schema = serializationSchema;
+		this.producerConfig = producerConfig;
+
+		// set the producer configuration properties for kafka record key value serializers.
+		if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+			this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+		} else {
+			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+		}
+
+		if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+			this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+		} else {
+			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+		}
+
+		// eagerly ensure that bootstrap servers are set.
+		if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+			throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
+		}
+
+		this.partitioner = customPartitioner;
+	}
+
+	// ---------------------------------- Properties --------------------------
+
+	/**
+	 * Defines whether the producer should fail on errors, or only log them.
+	 * If this is set to true, then exceptions will be only logged, if set to false,
+	 * exceptions will be eventually thrown and cause the streaming program to 
+	 * fail (and enter recovery).
+	 * 
+	 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+	 */
+	public void setLogFailuresOnly(boolean logFailuresOnly) {
+		this.logFailuresOnly = logFailuresOnly;
+	}
+
+	/**
+	 * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
+	 * to be acknowledged by the Kafka producer on a checkpoint.
+	 * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
+	 *
+	 * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
+	 */
+	public void setFlushOnCheckpoint(boolean flush) {
+		this.flushOnCheckpoint = flush;
+	}
+
+	/**
+	 * Used for testing only
+	 */
+	protected <K,V> KafkaProducer<K,V> getKafkaProducer(Properties props) {
+		return new KafkaProducer<>(props);
+	}
+
+	// ----------------------------------- Utilities --------------------------
+	
+	/**
+	 * Initializes the connection to Kafka.
+	 */
+	@Override
+	public void open(Configuration configuration) {
+		producer = getKafkaProducer(this.producerConfig);
+
+		// the fetched list is immutable, so we're creating a mutable copy in order to sort it
+		List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId));
+
+		// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
+		Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
+			@Override
+			public int compare(PartitionInfo o1, PartitionInfo o2) {
+				return Integer.compare(o1.partition(), o2.partition());
+			}
+		});
+
+		partitions = new int[partitionsList.size()];
+		for (int i = 0; i < partitions.length; i++) {
+			partitions[i] = partitionsList.get(i).partition();
+		}
+
+		RuntimeContext ctx = getRuntimeContext();
+		if (partitioner != null) {
+			partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
+		}
+
+		LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", 
+				ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId);
+
+		// register Kafka metrics to Flink accumulators
+		if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
+			Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
+
+			if (metrics == null) {
+				// MapR's Kafka implementation returns null here.
+				LOG.info("Producer implementation does not support metrics");
+			} else {
+				final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
+				for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
+					kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
+				}
+			}
+		}
+
+		if (flushOnCheckpoint && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) {
+			LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
+			flushOnCheckpoint = false;
+		}
+
+		if (logFailuresOnly) {
+			callback = new Callback() {
+				@Override
+				public void onCompletion(RecordMetadata metadata, Exception e) {
+					if (e != null) {
+						LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
+					}
+					acknowledgeMessage();
+				}
+			};
+		}
+		else {
+			callback = new Callback() {
+				@Override
+				public void onCompletion(RecordMetadata metadata, Exception exception) {
+					if (exception != null && asyncException == null) {
+						asyncException = exception;
+					}
+					acknowledgeMessage();
+				}
+			};
+		}
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Kafka.
+	 *
+	 * @param next
+	 * 		The incoming data
+	 */
+	@Override
+	public void invoke(IN next) throws Exception {
+		// propagate asynchronous errors
+		checkErroneous();
+
+		byte[] serializedKey = schema.serializeKey(next);
+		byte[] serializedValue = schema.serializeValue(next);
+		String targetTopic = schema.getTargetTopic(next);
+		if (targetTopic == null) {
+			targetTopic = defaultTopicId;
+		}
+
+		ProducerRecord<byte[], byte[]> record;
+		if (partitioner == null) {
+			record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
+		} else {
+			record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);
+		}
+		if (flushOnCheckpoint) {
+			synchronized (pendingRecordsLock) {
+				pendingRecords++;
+			}
+		}
+		producer.send(record, callback);
+	}
+
+
+	@Override
+	public void close() throws Exception {
+		if (producer != null) {
+			producer.close();
+		}
+		
+		// make sure we propagate pending errors
+		checkErroneous();
+	}
+
+	// ------------------- Logic for handling checkpoint flushing -------------------------- //
+
+	private void acknowledgeMessage() {
+		if (flushOnCheckpoint) {
+			synchronized (pendingRecordsLock) {
+				pendingRecords--;
+				if (pendingRecords == 0) {
+					pendingRecordsLock.notifyAll();
+				}
+			}
+		}
+	}
+
+	/**
+	 * Flush pending records.
+	 */
+	protected abstract void flush();
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		this.stateStore = context.getOperatorStateStore();
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
+		if (flushOnCheckpoint) {
+			// flushing is activated: We need to wait until pendingRecords is 0
+			flush();
+			synchronized (pendingRecordsLock) {
+				if (pendingRecords != 0) {
+					throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecords);
+				}
+				// pending records count is 0. We can now confirm the checkpoint
+			}
+		}
+	}
+
+	// ----------------------------------- Utilities --------------------------
+
+	protected void checkErroneous() throws Exception {
+		Exception e = asyncException;
+		if (e != null) {
+			// prevent double throwing
+			asyncException = null;
+			throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
+		}
+	}
+	
+	public static Properties getPropertiesFromBrokerList(String brokerList) {
+		String[] elements = brokerList.split(",");
+		
+		// validate the broker addresses
+		for (String broker: elements) {
+			NetUtils.getCorrectHostnamePort(broker);
+		}
+		
+		Properties props = new Properties();
+		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+		return props;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
new file mode 100644
index 0000000..ee98783
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Base class for {@link KafkaTableSink} that serializes data in JSON format
+ */
+public abstract class KafkaJsonTableSink extends KafkaTableSink {
+
+	/**
+	 * Creates KafkaJsonTableSink
+	 *
+	 * @param topic topic in Kafka to which table is written
+	 * @param properties properties to connect to Kafka
+	 * @param partitioner Kafka partitioner
+	 */
+	public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+		super(topic, properties, partitioner);
+	}
+
+	@Override
+	protected SerializationSchema<Row> createSerializationSchema(String[] fieldNames) {
+		return new JsonRowSerializationSchema(fieldNames);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
new file mode 100644
index 0000000..f145509
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka JSON {@link StreamTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ *
+ * <p>The field names are used to parse the JSON file and so are the types.
+ */
+public abstract class KafkaJsonTableSource extends KafkaTableSource {
+
+	/**
+	 * Creates a generic Kafka JSON {@link StreamTableSource}.
+	 *
+	 * @param topic      Kafka topic to consume.
+	 * @param properties Properties for the Kafka consumer.
+	 * @param fieldNames Row field names.
+	 * @param fieldTypes Row field types.
+	 */
+	KafkaJsonTableSource(
+			String topic,
+			Properties properties,
+			String[] fieldNames,
+			Class<?>[] fieldTypes) {
+
+		super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
+	}
+
+	/**
+	 * Creates a generic Kafka JSON {@link StreamTableSource}.
+	 *
+	 * @param topic      Kafka topic to consume.
+	 * @param properties Properties for the Kafka consumer.
+	 * @param fieldNames Row field names.
+	 * @param fieldTypes Row field types.
+	 */
+	KafkaJsonTableSource(
+			String topic,
+			Properties properties,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes) {
+
+		super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
+	}
+
+	/**
+	 * Configures the failure behaviour if a JSON field is missing.
+	 *
+	 * <p>By default, a missing field is ignored and the field is set to null.
+	 *
+	 * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
+	 */
+	public void setFailOnMissingField(boolean failOnMissingField) {
+		JsonRowDeserializationSchema deserializationSchema = (JsonRowDeserializationSchema) getDeserializationSchema();
+		deserializationSchema.setFailOnMissingField(failOnMissingField);
+	}
+
+	private static JsonRowDeserializationSchema createDeserializationSchema(
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes) {
+
+		return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
+	}
+
+	private static JsonRowDeserializationSchema createDeserializationSchema(
+			String[] fieldNames,
+			Class<?>[] fieldTypes) {
+
+		return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
new file mode 100644
index 0000000..714d9cd
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sinks.StreamTableSink;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka {@link StreamTableSink}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}.
+ */
+public abstract class KafkaTableSink implements StreamTableSink<Row> {
+
+	protected final String topic;
+	protected final Properties properties;
+	protected SerializationSchema<Row> serializationSchema;
+	protected final KafkaPartitioner<Row> partitioner;
+	protected String[] fieldNames;
+	protected TypeInformation[] fieldTypes;
+
+	/**
+	 * Creates KafkaTableSink
+	 *
+	 * @param topic                 Kafka topic to write to.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param partitioner           Partitioner to select Kafka partition for each item
+	 */
+	public KafkaTableSink(
+			String topic,
+			Properties properties,
+			KafkaPartitioner<Row> partitioner) {
+
+		this.topic = Preconditions.checkNotNull(topic, "topic");
+		this.properties = Preconditions.checkNotNull(properties, "properties");
+		this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
+	}
+
+	/**
+	 * Returns the version-specifid Kafka producer.
+	 *
+	 * @param topic               Kafka topic to produce to.
+	 * @param properties          Properties for the Kafka producer.
+	 * @param serializationSchema Serialization schema to use to create Kafka records.
+	 * @param partitioner         Partitioner to select Kafka partition.
+	 * @return The version-specific Kafka producer
+	 */
+	protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
+		String topic, Properties properties,
+		SerializationSchema<Row> serializationSchema,
+		KafkaPartitioner<Row> partitioner);
+
+	/**
+	 * Create serialization schema for converting table rows into bytes.
+	 *
+	 * @param fieldNames Field names in table rows.
+	 * @return Instance of serialization schema
+	 */
+	protected abstract SerializationSchema<Row> createSerializationSchema(String[] fieldNames);
+
+	/**
+	 * Create a deep copy of this sink.
+	 *
+	 * @return Deep copy of this sink
+	 */
+	protected abstract KafkaTableSink createCopy();
+
+	@Override
+	public void emitDataStream(DataStream<Row> dataStream) {
+		FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
+		dataStream.addSink(kafkaProducer);
+	}
+
+	@Override
+	public TypeInformation<Row> getOutputType() {
+		return new RowTypeInfo(getFieldTypes());
+	}
+
+	public String[] getFieldNames() {
+		return fieldNames;
+	}
+
+	@Override
+	public TypeInformation<?>[] getFieldTypes() {
+		return fieldTypes;
+	}
+
+	@Override
+	public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		KafkaTableSink copy = createCopy();
+		copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
+		copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
+		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
+			"Number of provided field names and types does not match.");
+		copy.serializationSchema = createSerializationSchema(fieldNames);
+
+		return copy;
+	}
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
new file mode 100644
index 0000000..fd423d7
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo;
+
+/**
+ * A version-agnostic Kafka {@link StreamTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ */
+public abstract class KafkaTableSource implements StreamTableSource<Row> {
+
+	/** The Kafka topic to consume. */
+	private final String topic;
+
+	/** Properties for the Kafka consumer. */
+	private final Properties properties;
+
+	/** Deserialization schema to use for Kafka records. */
+	private final DeserializationSchema<Row> deserializationSchema;
+
+	/** Row field names. */
+	private final String[] fieldNames;
+
+	/** Row field types. */
+	private final TypeInformation<?>[] fieldTypes;
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @param fieldNames            Row field names.
+	 * @param fieldTypes            Row field types.
+	 */
+	KafkaTableSource(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			String[] fieldNames,
+			Class<?>[] fieldTypes) {
+
+		this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes));
+	}
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @param fieldNames            Row field names.
+	 * @param fieldTypes            Row field types.
+	 */
+	KafkaTableSource(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes) {
+
+		this.topic = Preconditions.checkNotNull(topic, "Topic");
+		this.properties = Preconditions.checkNotNull(properties, "Properties");
+		this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
+		this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
+		this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
+
+		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
+				"Number of provided field names and types does not match.");
+	}
+
+	/**
+	 * NOTE: This method is for internal use only for defining a TableSource.
+	 *       Do not use it in Table API programs.
+	 */
+	@Override
+	public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
+		// Version-specific Kafka consumer
+		FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema);
+		DataStream<Row> kafkaSource = env.addSource(kafkaConsumer);
+		return kafkaSource;
+	}
+
+	@Override
+	public int getNumberOfFields() {
+		return fieldNames.length;
+	}
+
+	@Override
+	public String[] getFieldsNames() {
+		return fieldNames;
+	}
+
+	@Override
+	public TypeInformation<?>[] getFieldTypes() {
+		return fieldTypes;
+	}
+
+	@Override
+	public TypeInformation<Row> getReturnType() {
+		return new RowTypeInfo(fieldTypes);
+	}
+
+	/**
+	 * Returns the version-specific Kafka consumer.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @return The version-specific Kafka consumer
+	 */
+	abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema);
+
+	/**
+	 * Returns the deserialization schema.
+	 *
+	 * @return The deserialization schema
+	 */
+	protected DeserializationSchema<Row> getDeserializationSchema() {
+		return deserializationSchema;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
new file mode 100644
index 0000000..cf39606
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all fetchers, which implement the connections to Kafka brokers and
+ * pull records from Kafka partitions.
+ * 
+ * <p>This fetcher base class implements the logic around emitting records and tracking offsets,
+ * as well as around the optional timestamp assignment and watermark generation. 
+ * 
+ * @param <T> The type of elements deserialized from Kafka's byte records, and emitted into
+ *            the Flink data streams.
+ * @param <KPH> The type of topic/partition identifier used by Kafka in the specific version.
+ */
+public abstract class AbstractFetcher<T, KPH> {
+	
+	protected static final int NO_TIMESTAMPS_WATERMARKS = 0;
+	protected static final int PERIODIC_WATERMARKS = 1;
+	protected static final int PUNCTUATED_WATERMARKS = 2;
+	
+	// ------------------------------------------------------------------------
+	
+	/** The source context to emit records and watermarks to */
+	protected final SourceContext<T> sourceContext;
+
+	/** The lock that guarantees that record emission and state updates are atomic,
+	 * from the view of taking a checkpoint */
+	protected final Object checkpointLock;
+
+	/** All partitions (and their state) that this fetcher is subscribed to */
+	private final KafkaTopicPartitionState<KPH>[] allPartitions;
+
+	/** The mode describing whether the fetcher also generates timestamps and watermarks */
+	protected final int timestampWatermarkMode;
+
+	/** Flag whether to register metrics for the fetcher */
+	protected final boolean useMetrics;
+
+	/** Only relevant for punctuated watermarks: The current cross partition watermark */
+	private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
+
+	// ------------------------------------------------------------------------
+	
+	protected AbstractFetcher(
+			SourceContext<T> sourceContext,
+			List<KafkaTopicPartition> assignedPartitions,
+			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+			ProcessingTimeService processingTimeProvider,
+			long autoWatermarkInterval,
+			ClassLoader userCodeClassLoader,
+			boolean useMetrics) throws Exception
+	{
+		this.sourceContext = checkNotNull(sourceContext);
+		this.checkpointLock = sourceContext.getCheckpointLock();
+		this.useMetrics = useMetrics;
+		
+		// figure out what we watermark mode we will be using
+		
+		if (watermarksPeriodic == null) {
+			if (watermarksPunctuated == null) {
+				// simple case, no watermarks involved
+				timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
+			} else {
+				timestampWatermarkMode = PUNCTUATED_WATERMARKS;
+			}
+		} else {
+			if (watermarksPunctuated == null) {
+				timestampWatermarkMode = PERIODIC_WATERMARKS;
+			} else {
+				throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
+			}
+		}
+		
+		// create our partition state according to the timestamp/watermark mode 
+		this.allPartitions = initializePartitions(
+				assignedPartitions,
+				timestampWatermarkMode,
+				watermarksPeriodic, watermarksPunctuated,
+				userCodeClassLoader);
+		
+		// if we have periodic watermarks, kick off the interval scheduler
+		if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+			KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts = 
+					(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
+			
+			PeriodicWatermarkEmitter periodicEmitter = 
+					new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
+			periodicEmitter.start();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets all partitions (with partition state) that this fetcher is subscribed to.
+	 *
+	 * @return All subscribed partitions.
+	 */
+	protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
+		return allPartitions;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Core fetcher work methods
+	// ------------------------------------------------------------------------
+
+	public abstract void runFetchLoop() throws Exception;
+	
+	public abstract void cancel();
+
+	// ------------------------------------------------------------------------
+	//  Kafka version specifics
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Creates the Kafka version specific representation of the given
+	 * topic partition.
+	 * 
+	 * @param partition The Flink representation of the Kafka topic partition.
+	 * @return The specific Kafka representation of the Kafka topic partition.
+	 */
+	public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition);
+
+	/**
+	 * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
+	 * older Kafka versions). The given offsets are the internal checkpointed offsets, representing
+	 * the last processed record of each partition. Version-specific implementations of this method
+	 * need to hold the contract that the given offsets must be incremented by 1 before
+	 * committing them, so that committed offsets to Kafka represent "the next record to process".
+	 * 
+	 * @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
+	 * @throws Exception This method forwards exceptions.
+	 */
+	public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
+	
+	// ------------------------------------------------------------------------
+	//  snapshot and restore the state
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Takes a snapshot of the partition offsets.
+	 * 
+	 * <p>Important: This method mus be called under the checkpoint lock.
+	 * 
+	 * @return A map from partition to current offset.
+	 */
+	public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
+		// this method assumes that the checkpoint lock is held
+		assert Thread.holdsLock(checkpointLock);
+
+		HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
+		for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
+			state.put(partition.getKafkaTopicPartition(), partition.getOffset());
+		}
+		return state;
+	}
+
+	/**
+	 * Restores the partition offsets.
+	 * 
+	 * @param snapshotState The offsets for the partitions 
+	 */
+	public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) {
+		for (KafkaTopicPartitionState<?> partition : allPartitions) {
+			Long offset = snapshotState.get(partition.getKafkaTopicPartition());
+			if (offset != null) {
+				partition.setOffset(offset);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  emitting records
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Emits a record without attaching an existing timestamp to it.
+	 * 
+	 * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
+	 * That makes the fast path efficient, the extended paths are called as separate methods.
+	 * 
+	 * @param record The record to emit
+	 * @param partitionState The state of the Kafka partition from which the record was fetched
+	 * @param offset The offset of the record
+	 */
+	protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception {
+		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+			// fast path logic, in case there are no watermarks
+
+			// emit the record, using the checkpoint lock to guarantee
+			// atomicity of record emission and offset state update
+			synchronized (checkpointLock) {
+				sourceContext.collect(record);
+				partitionState.setOffset(offset);
+			}
+		}
+		else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+			emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE);
+		}
+		else {
+			emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE);
+		}
+	}
+
+	/**
+	 * Emits a record attaching a timestamp to it.
+	 *
+	 * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
+	 * That makes the fast path efficient, the extended paths are called as separate methods.
+	 *
+	 * @param record The record to emit
+	 * @param partitionState The state of the Kafka partition from which the record was fetched
+	 * @param offset The offset of the record
+	 */
+	protected void emitRecordWithTimestamp(
+			T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
+
+		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+			// fast path logic, in case there are no watermarks generated in the fetcher
+
+			// emit the record, using the checkpoint lock to guarantee
+			// atomicity of record emission and offset state update
+			synchronized (checkpointLock) {
+				sourceContext.collectWithTimestamp(record, timestamp);
+				partitionState.setOffset(offset);
+			}
+		}
+		else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+			emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
+		}
+		else {
+			emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
+		}
+	}
+
+	/**
+	 * Record emission, if a timestamp will be attached from an assigner that is
+	 * also a periodic watermark generator.
+	 */
+	protected void emitRecordWithTimestampAndPeriodicWatermark(
+			T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp)
+	{
+		@SuppressWarnings("unchecked")
+		final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState =
+				(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
+
+		// extract timestamp - this accesses/modifies the per-partition state inside the
+		// watermark generator instance, so we need to lock the access on the
+		// partition state. concurrent access can happen from the periodic emitter
+		final long timestamp;
+		//noinspection SynchronizationOnLocalVariableOrMethodParameter
+		synchronized (withWatermarksState) {
+			timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
+		}
+
+		// emit the record with timestamp, using the usual checkpoint lock to guarantee
+		// atomicity of record emission and offset state update 
+		synchronized (checkpointLock) {
+			sourceContext.collectWithTimestamp(record, timestamp);
+			partitionState.setOffset(offset);
+		}
+	}
+
+	/**
+	 * Record emission, if a timestamp will be attached from an assigner that is
+	 * also a punctuated watermark generator.
+	 */
+	protected void emitRecordWithTimestampAndPunctuatedWatermark(
+			T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp)
+	{
+		@SuppressWarnings("unchecked")
+		final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
+				(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
+
+		// only one thread ever works on accessing timestamps and watermarks
+		// from the punctuated extractor
+		final long timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
+		final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
+
+		// emit the record with timestamp, using the usual checkpoint lock to guarantee
+		// atomicity of record emission and offset state update 
+		synchronized (checkpointLock) {
+			sourceContext.collectWithTimestamp(record, timestamp);
+			partitionState.setOffset(offset);
+		}
+
+		// if we also have a new per-partition watermark, check if that is also a
+		// new cross-partition watermark
+		if (newWatermark != null) {
+			updateMinPunctuatedWatermark(newWatermark);
+		}
+	}
+
+	/**
+	 *Checks whether a new per-partition watermark is also a new cross-partition watermark.
+	 */
+	private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
+		if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
+			long newMin = Long.MAX_VALUE;
+
+			for (KafkaTopicPartitionState<?> state : allPartitions) {
+				@SuppressWarnings("unchecked")
+				final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
+						(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
+				
+				newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
+			}
+
+			// double-check locking pattern
+			if (newMin > maxWatermarkSoFar) {
+				synchronized (checkpointLock) {
+					if (newMin > maxWatermarkSoFar) {
+						maxWatermarkSoFar = newMin;
+						sourceContext.emitWatermark(new Watermark(newMin));
+					}
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Utility method that takes the topic partitions and creates the topic partition state
+	 * holders. If a watermark generator per partition exists, this will also initialize those.
+	 */
+	private KafkaTopicPartitionState<KPH>[] initializePartitions(
+			List<KafkaTopicPartition> assignedPartitions,
+			int timestampWatermarkMode,
+			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+			ClassLoader userCodeClassLoader)
+		throws IOException, ClassNotFoundException
+	{
+		switch (timestampWatermarkMode) {
+			
+			case NO_TIMESTAMPS_WATERMARKS: {
+				@SuppressWarnings("unchecked")
+				KafkaTopicPartitionState<KPH>[] partitions =
+						(KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()];
+
+				int pos = 0;
+				for (KafkaTopicPartition partition : assignedPartitions) {
+					// create the kafka version specific partition handle
+					KPH kafkaHandle = createKafkaPartitionHandle(partition);
+					partitions[pos++] = new KafkaTopicPartitionState<>(partition, kafkaHandle);
+				}
+
+				return partitions;
+			}
+
+			case PERIODIC_WATERMARKS: {
+				@SuppressWarnings("unchecked")
+				KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
+						(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
+								new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];
+
+				int pos = 0;
+				for (KafkaTopicPartition partition : assignedPartitions) {
+					KPH kafkaHandle = createKafkaPartitionHandle(partition);
+
+					AssignerWithPeriodicWatermarks<T> assignerInstance =
+							watermarksPeriodic.deserializeValue(userCodeClassLoader);
+					
+					partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+							partition, kafkaHandle, assignerInstance);
+				}
+
+				return partitions;
+			}
+
+			case PUNCTUATED_WATERMARKS: {
+				@SuppressWarnings("unchecked")
+				KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
+						(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
+								new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitions.size()];
+
+				int pos = 0;
+				for (KafkaTopicPartition partition : assignedPartitions) {
+					KPH kafkaHandle = createKafkaPartitionHandle(partition);
+
+					AssignerWithPunctuatedWatermarks<T> assignerInstance =
+							watermarksPunctuated.deserializeValue(userCodeClassLoader);
+
+					partitions[pos++] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+							partition, kafkaHandle, assignerInstance);
+				}
+
+				return partitions;
+			}
+			default:
+				// cannot happen, add this as a guard for the future
+				throw new RuntimeException();
+		}
+	}
+
+	// ------------------------- Metrics ----------------------------------
+
+	/**
+	 * Add current and committed offsets to metric group
+	 *
+	 * @param metricGroup The metric group to use
+	 */
+	protected void addOffsetStateGauge(MetricGroup metricGroup) {
+		// add current offsets to gage
+		MetricGroup currentOffsets = metricGroup.addGroup("current-offsets");
+		MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets");
+		for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) {
+			currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
+			committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
+		}
+	}
+
+	/**
+	 * Gauge types
+	 */
+	private enum OffsetGaugeType {
+		CURRENT_OFFSET,
+		COMMITTED_OFFSET
+	}
+
+	/**
+	 * Gauge for getting the offset of a KafkaTopicPartitionState.
+	 */
+	private static class OffsetGauge implements Gauge<Long> {
+
+		private final KafkaTopicPartitionState<?> ktp;
+		private final OffsetGaugeType gaugeType;
+
+		public OffsetGauge(KafkaTopicPartitionState<?> ktp, OffsetGaugeType gaugeType) {
+			this.ktp = ktp;
+			this.gaugeType = gaugeType;
+		}
+
+		@Override
+		public Long getValue() {
+			switch(gaugeType) {
+				case COMMITTED_OFFSET:
+					return ktp.getCommittedOffset();
+				case CURRENT_OFFSET:
+					return ktp.getOffset();
+				default:
+					throw new RuntimeException("Unknown gauge type: " + gaugeType);
+			}
+		}
+	}
+ 	// ------------------------------------------------------------------------
+	
+	/**
+	 * The periodic watermark emitter. In its given interval, it checks all partitions for
+	 * the current event time watermark, and possibly emits the next watermark.
+	 */
+	private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback {
+
+		private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
+		
+		private final SourceContext<?> emitter;
+		
+		private final ProcessingTimeService timerService;
+
+		private final long interval;
+		
+		private long lastWatermarkTimestamp;
+		
+		//-------------------------------------------------
+
+		PeriodicWatermarkEmitter(
+				KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
+				SourceContext<?> emitter,
+				ProcessingTimeService timerService,
+				long autoWatermarkInterval)
+		{
+			this.allPartitions = checkNotNull(allPartitions);
+			this.emitter = checkNotNull(emitter);
+			this.timerService = checkNotNull(timerService);
+			this.interval = autoWatermarkInterval;
+			this.lastWatermarkTimestamp = Long.MIN_VALUE;
+		}
+
+		//-------------------------------------------------
+		
+		public void start() {
+			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
+		}
+		
+		@Override
+		public void onProcessingTime(long timestamp) throws Exception {
+
+			long minAcrossAll = Long.MAX_VALUE;
+			for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) {
+				
+				// we access the current watermark for the periodic assigners under the state
+				// lock, to prevent concurrent modification to any internal variables
+				final long curr;
+				//noinspection SynchronizationOnLocalVariableOrMethodParameter
+				synchronized (state) {
+					curr = state.getCurrentWatermarkTimestamp();
+				}
+				
+				minAcrossAll = Math.min(minAcrossAll, curr);
+			}
+			
+			// emit next watermark, if there is one
+			if (minAcrossAll > lastWatermarkTimestamp) {
+				lastWatermarkTimestamp = minAcrossAll;
+				emitter.emitWatermark(new Watermark(minAcrossAll));
+			}
+			
+			// schedule the next watermark
+			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
new file mode 100644
index 0000000..c736493
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A proxy that communicates exceptions between threads. Typically used if an exception
+ * from a spawned thread needs to be recognized by the "parent" (spawner) thread.
+ * 
+ * <p>The spawned thread would set the exception via {@link #reportError(Throwable)}.
+ * The parent would check (at certain points) for exceptions via {@link #checkAndThrowException()}.
+ * Optionally, the parent can pass itself in the constructor to be interrupted as soon as
+ * an exception occurs.
+ * 
+ * <pre>
+ * {@code
+ * 
+ * final ExceptionProxy errorProxy = new ExceptionProxy(Thread.currentThread());
+ * 
+ * Thread subThread = new Thread() {
+ * 
+ *     public void run() {
+ *         try {
+ *             doSomething();
+ *         } catch (Throwable t) {
+ *             errorProxy.reportError(
+ *         } finally {
+ *             doSomeCleanup();
+ *         }
+ *     }
+ * };
+ * subThread.start();
+ * 
+ * doSomethingElse();
+ * errorProxy.checkAndThrowException();
+ * 
+ * doSomethingMore();
+ * errorProxy.checkAndThrowException();
+ * 
+ * try {
+ *     subThread.join();
+ * } catch (InterruptedException e) {
+ *     errorProxy.checkAndThrowException();
+ *     // restore interrupted status, if not caused by an exception
+ *     Thread.currentThread().interrupt();
+ * }
+ * }
+ * </pre>
+ */
+public class ExceptionProxy {
+	
+	/** The thread that should be interrupted when an exception occurs */
+	private final Thread toInterrupt;
+	
+	/** The exception to throw */ 
+	private final AtomicReference<Throwable> exception;
+
+	/**
+	 * Creates an exception proxy that interrupts the given thread upon
+	 * report of an exception. The thread to interrupt may be null.
+	 * 
+	 * @param toInterrupt The thread to interrupt upon an exception. May be null.
+	 */
+	public ExceptionProxy(@Nullable Thread toInterrupt) {
+		this.toInterrupt = toInterrupt;
+		this.exception = new AtomicReference<>();
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Sets the exception and interrupts the target thread,
+	 * if no other exception has occurred so far.
+	 * 
+	 * <p>The exception is only set (and the interruption is only triggered),
+	 * if no other exception was set before.
+	 * 
+	 * @param t The exception that occurred
+	 */
+	public void reportError(Throwable t) {
+		// set the exception, if it is the first (and the exception is non null)
+		if (t != null && exception.compareAndSet(null, t) && toInterrupt != null) {
+			toInterrupt.interrupt();
+		}
+	}
+
+	/**
+	 * Checks whether an exception has been set via {@link #reportError(Throwable)}.
+	 * If yes, that exception if re-thrown by this method.
+	 * 
+	 * @throws Exception This method re-throws the exception, if set.
+	 */
+	public void checkAndThrowException() throws Exception {
+		Throwable t = exception.get();
+		if (t != null) {
+			if (t instanceof Exception) {
+				throw (Exception) t;
+			}
+			else if (t instanceof Error) {
+				throw (Error) t;
+			}
+			else {
+				throw new Exception(t);
+			}
+		}
+	}
+}