You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:22 UTC

[32/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
new file mode 100644
index 0000000..c68fe28
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink's description of a partition in a Kafka topic.
+ * Serializable, and common across all Kafka consumer subclasses (0.8, 0.9, ...)
+ * 
+ * <p>Note: This class must not change in its structure, because it would change the
+ * serialization format and make previous savepoints unreadable.
+ */
+public final class KafkaTopicPartition implements Serializable {
+
+	/** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK
+	 * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS */
+	private static final long serialVersionUID = 722083576322742325L;
+	
+	// ------------------------------------------------------------------------
+
+	private final String topic;
+	private final int partition;
+	private final int cachedHash;
+
+	public KafkaTopicPartition(String topic, int partition) {
+		this.topic = requireNonNull(topic);
+		this.partition = partition;
+		this.cachedHash = 31 * topic.hashCode() + partition;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	public String getTopic() {
+		return topic;
+	}
+
+	public int getPartition() {
+		return partition;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return "KafkaTopicPartition{" +
+				"topic='" + topic + '\'' +
+				", partition=" + partition +
+				'}';
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		else if (o instanceof KafkaTopicPartition) {
+			KafkaTopicPartition that = (KafkaTopicPartition) o;
+			return this.partition == that.partition && this.topic.equals(that.topic);
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return cachedHash;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	public static String toString(Map<KafkaTopicPartition, Long> map) {
+		StringBuilder sb = new StringBuilder();
+		for (Map.Entry<KafkaTopicPartition, Long> p: map.entrySet()) {
+			KafkaTopicPartition ktp = p.getKey();
+			sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(", ");
+		}
+		return sb.toString();
+	}
+
+	public static String toString(List<KafkaTopicPartition> partitions) {
+		StringBuilder sb = new StringBuilder();
+		for (KafkaTopicPartition p: partitions) {
+			sb.append(p.getTopic()).append(":").append(p.getPartition()).append(", ");
+		}
+		return sb.toString();
+	}
+
+
+	public static List<KafkaTopicPartition> dropLeaderData(List<KafkaTopicPartitionLeader> partitionInfos) {
+		List<KafkaTopicPartition> ret = new ArrayList<>(partitionInfos.size());
+		for(KafkaTopicPartitionLeader ktpl: partitionInfos) {
+			ret.add(ktpl.getTopicPartition());
+		}
+		return ret;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
new file mode 100644
index 0000000..1959a05
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.kafka.common.Node;
+
+import java.io.Serializable;
+
+/**
+ * Serializable Topic Partition info with leader Node information.
+ * This class is used at runtime.
+ */
+public class KafkaTopicPartitionLeader implements Serializable {
+
+	private static final long serialVersionUID = 9145855900303748582L;
+
+	private final int leaderId;
+	private final int leaderPort;
+	private final String leaderHost;
+	private final KafkaTopicPartition topicPartition;
+	private final int cachedHash;
+
+	public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, Node leader) {
+		this.topicPartition = topicPartition;
+		if (leader == null) {
+			this.leaderId = -1;
+			this.leaderHost = null;
+			this.leaderPort = -1;
+		} else {
+			this.leaderId = leader.id();
+			this.leaderPort = leader.port();
+			this.leaderHost = leader.host();
+		}
+		int cachedHash = (leader == null) ? 14 : leader.hashCode();
+		this.cachedHash = 31 * cachedHash + topicPartition.hashCode();
+	}
+
+	public KafkaTopicPartition getTopicPartition() {
+		return topicPartition;
+	}
+
+	public Node getLeader() {
+		if (this.leaderId == -1) {
+			return null;
+		} else {
+			return new Node(leaderId, leaderHost, leaderPort);
+		}
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (!(o instanceof KafkaTopicPartitionLeader)) {
+			return false;
+		}
+
+		KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o;
+
+		if (!topicPartition.equals(that.topicPartition)) {
+			return false;
+		}
+		return leaderId == that.leaderId && leaderPort == that.leaderPort && leaderHost.equals(that.leaderHost);
+	}
+
+	@Override
+	public int hashCode() {
+		return cachedHash;
+	}
+
+	@Override
+	public String toString() {
+		return "KafkaTopicPartitionLeader{" +
+				"leaderId=" + leaderId +
+				", leaderPort=" + leaderPort +
+				", leaderHost='" + leaderHost + '\'' +
+				", topic=" + topicPartition.getTopic() +
+				", partition=" + topicPartition.getPartition() +
+				'}';
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
new file mode 100644
index 0000000..7cb5f46
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * The state that the Flink Kafka Consumer holds for each Kafka partition.
+ * Includes the Kafka descriptor for partitions.
+ * 
+ * <p>This class describes the most basic state (only the offset), subclasses
+ * define more elaborate state, containing current watermarks and timestamp
+ * extractors.
+ * 
+ * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
+ */
+public class KafkaTopicPartitionState<KPH> {
+
+	/** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
+	 * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
+	public static final long OFFSET_NOT_SET = -915623761776L;
+	
+	// ------------------------------------------------------------------------
+
+	/** The Flink description of a Kafka partition */
+	private final KafkaTopicPartition partition;
+
+	/** The Kafka description of a Kafka partition (varies across different Kafka versions) */
+	private final KPH kafkaPartitionHandle;
+	
+	/** The offset within the Kafka partition that we already processed */
+	private volatile long offset;
+
+	/** The offset of the Kafka partition that has been committed */
+	private volatile long committedOffset;
+
+	// ------------------------------------------------------------------------
+	
+	public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) {
+		this.partition = partition;
+		this.kafkaPartitionHandle = kafkaPartitionHandle;
+		this.offset = OFFSET_NOT_SET;
+		this.committedOffset = OFFSET_NOT_SET;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets Flink's descriptor for the Kafka Partition.
+	 * @return The Flink partition descriptor.
+	 */
+	public final KafkaTopicPartition getKafkaTopicPartition() {
+		return partition;
+	}
+
+	/**
+	 * Gets Kafka's descriptor for the Kafka Partition.
+	 * @return The Kafka partition descriptor.
+	 */
+	public final KPH getKafkaPartitionHandle() {
+		return kafkaPartitionHandle;
+	}
+
+	public final String getTopic() {
+		return partition.getTopic();
+	}
+
+	public final int getPartition() {
+		return partition.getPartition();
+	}
+
+	/**
+	 * The current offset in the partition. This refers to the offset last element that
+	 * we retrieved and emitted successfully. It is the offset that should be stored in
+	 * a checkpoint.
+	 */
+	public final long getOffset() {
+		return offset;
+	}
+
+	public final void setOffset(long offset) {
+		this.offset = offset;
+	}
+
+	public final boolean isOffsetDefined() {
+		return offset != OFFSET_NOT_SET;
+	}
+
+	public final void setCommittedOffset(long offset) {
+		this.committedOffset = offset;
+	}
+
+	public final long getCommittedOffset() {
+		return committedOffset;
+	}
+
+	
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "Partition: " + partition + ", KafkaPartitionHandle=" + kafkaPartitionHandle
+				+ ", offset=" + (isOffsetDefined() ? String.valueOf(offset) : "(not set)");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
new file mode 100644
index 0000000..efdc73f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * A special version of the per-kafka-partition-state that additionally holds
+ * a periodic watermark generator (and timestamp extractor) per partition.
+ * 
+ * @param <T> The type of records handled by the watermark generator
+ * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
+ */
+public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
+	
+	/** The timestamp assigner and watermark generator for the partition */
+	private final AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks;
+	
+	/** The last watermark timestamp generated by this partition */
+	private long partitionWatermark;
+
+	// ------------------------------------------------------------------------
+	
+	public KafkaTopicPartitionStateWithPeriodicWatermarks(
+			KafkaTopicPartition partition, KPH kafkaPartitionHandle,
+			AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks)
+	{
+		super(partition, kafkaPartitionHandle);
+		
+		this.timestampsAndWatermarks = timestampsAndWatermarks;
+		this.partitionWatermark = Long.MIN_VALUE;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
+		return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
+	}
+	
+	public long getCurrentWatermarkTimestamp() {
+		Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
+		if (wm != null) {
+			partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp());
+		}
+		return partitionWatermark;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition()
+				+ ", offset=" + getOffset() + ", watermark=" + partitionWatermark;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
new file mode 100644
index 0000000..edf40ce
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import javax.annotation.Nullable;
+
+/**
+ * A special version of the per-kafka-partition-state that additionally holds
+ * a periodic watermark generator (and timestamp extractor) per partition.
+ * 
+ * <p>This class is not thread safe, but it gives volatile access to the current
+ * partition watermark ({@link #getCurrentPartitionWatermark()}).
+ * 
+ * @param <T> The type of records handled by the watermark generator
+ * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions
+ */
+public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
+	
+	/** The timestamp assigner and watermark generator for the partition */
+	private final AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks;
+	
+	/** The last watermark timestamp generated by this partition */
+	private volatile long partitionWatermark;
+
+	// ------------------------------------------------------------------------
+	
+	public KafkaTopicPartitionStateWithPunctuatedWatermarks(
+			KafkaTopicPartition partition, KPH kafkaPartitionHandle,
+			AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks)
+	{
+		super(partition, kafkaPartitionHandle);
+		
+		this.timestampsAndWatermarks = timestampsAndWatermarks;
+		this.partitionWatermark = Long.MIN_VALUE;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
+		return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
+	}
+
+	@Nullable
+	public Watermark checkAndGetNewWatermark(T record, long timestamp) {
+		Watermark mark = timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp);
+		if (mark != null && mark.getTimestamp() > partitionWatermark) {
+			partitionWatermark = mark.getTimestamp();
+			return mark;
+		}
+		else {
+			return null;
+		}
+	}
+	
+	public long getCurrentPartitionWatermark() {
+		return partitionWatermark;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return "KafkaTopicPartitionStateWithPunctuatedWatermarks: partition=" + getKafkaTopicPartition()
+				+ ", offset=" + getOffset() + ", watermark=" + partitionWatermark;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
new file mode 100644
index 0000000..7a41ade
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+public class TypeUtil {
+	private TypeUtil() {}
+
+	/**
+	 * Creates TypeInformation array for an array of Classes.
+	 * @param fieldTypes classes to extract type information from
+	 * @return type information
+	 */
+	public static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) {
+		TypeInformation<?>[] typeInfos = new TypeInformation[fieldTypes.length];
+		for (int i = 0; i < fieldTypes.length; i++) {
+			typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]);
+		}
+		return typeInfos;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
new file mode 100644
index 0000000..cedb696
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+/**
+ * Gauge for getting the current value of a Kafka metric.
+ */
+public class KafkaMetricWrapper implements Gauge<Double> {
+	private final org.apache.kafka.common.Metric kafkaMetric;
+
+	public KafkaMetricWrapper(org.apache.kafka.common.Metric metric) {
+		this.kafkaMetric = metric;
+	}
+
+	@Override
+	public Double getValue() {
+		return kafkaMetric.value();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
new file mode 100644
index 0000000..9b848e0
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ * 	# More Flink partitions than kafka partitions
+ * <pre>
+ * 		Flink Sinks:		Kafka Partitions
+ * 			1	----------------&gt;	1
+ * 			2   --------------/
+ * 			3   -------------/
+ * 			4	------------/
+ * </pre>
+ * Some (or all) kafka partitions contain the output of more than one flink partition
+ *
+ *# Fewer Flink partitions than Kafka
+ * <pre>
+ * 		Flink Sinks:		Kafka Partitions
+ * 			1	----------------&gt;	1
+ * 			2	----------------&gt;	2
+ * 										3
+ * 										4
+ * 										5
+ * </pre>
+ *
+ *  Not all Kafka partitions contain data
+ *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
+ *  cause a lot of network connections between all the Flink instances and all the Kafka brokers
+ *
+ *
+ */
+public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {
+	private static final long serialVersionUID = 1627268846962918126L;
+
+	private int targetPartition = -1;
+
+	@Override
+	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
+		if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) {
+			throw new IllegalArgumentException();
+		}
+		
+		this.targetPartition = partitions[parallelInstanceId % partitions.length];
+	}
+
+	@Override
+	public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+		if (targetPartition >= 0) {
+			return targetPartition;
+		} else {
+			throw new RuntimeException("The partitioner has not been initialized properly");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
new file mode 100644
index 0000000..37e2ef6
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * It contains a open() method which is called on each parallel instance.
+ * Partitioners must be serializable!
+ */
+public abstract class KafkaPartitioner<T> implements Serializable {
+
+	private static final long serialVersionUID = -1974260817778593473L;
+
+	/**
+	 * Initializer for the Partitioner.
+	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
+	 * @param parallelInstances the total number of parallel instances
+	 * @param partitions an array describing the partition IDs of the available Kafka partitions.
+	 */
+	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
+		// overwrite this method if needed.
+	}
+
+	public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
new file mode 100644
index 0000000..d170058
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+
+
+/**
+ * DeserializationSchema that deserializes a JSON String into an ObjectNode.
+ * <p>
+ * Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
+ */
+public class JSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
+	private ObjectMapper mapper;
+
+	@Override
+	public ObjectNode deserialize(byte[] message) throws IOException {
+		if (mapper == null) {
+			mapper = new ObjectMapper();
+		}
+		return mapper.readValue(message, ObjectNode.class);
+	}
+
+	@Override
+	public boolean isEndOfStream(ObjectNode nextElement) {
+		return false;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
new file mode 100644
index 0000000..261a111
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
+
+/**
+ * DeserializationSchema that deserializes a JSON String into an ObjectNode.
+ * <p>
+ * Key fields can be accessed by calling objectNode.get("key").get(&lt;name>).as(&lt;type>)
+ * <p>
+ * Value fields can be accessed by calling objectNode.get("value").get(&lt;name>).as(&lt;type>)
+ * <p>
+ * Metadata fields can be accessed by calling objectNode.get("metadata").get(&lt;name>).as(&lt;type>) and include
+ * the "offset" (long), "topic" (String) and "partition" (int).
+ */
+public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> {
+	private final boolean includeMetadata;
+	private ObjectMapper mapper;
+
+	public JSONKeyValueDeserializationSchema(boolean includeMetadata) {
+		this.includeMetadata = includeMetadata;
+	}
+
+	@Override
+	public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+		if (mapper == null) {
+			mapper = new ObjectMapper();
+		}
+		ObjectNode node = mapper.createObjectNode();
+		node.set("key", mapper.readValue(messageKey, JsonNode.class));
+		node.set("value", mapper.readValue(message, JsonNode.class));
+		if (includeMetadata) {
+			node.putObject("metadata")
+				.put("offset", offset)
+				.put("topic", topic)
+				.put("partition", partition);
+		}
+		return node;
+	}
+
+	@Override
+	public boolean isEndOfStream(ObjectNode nextElement) {
+		return false;
+	}
+
+	@Override
+	public TypeInformation<ObjectNode> getProducedType() {
+		return getForClass(ObjectNode.class);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
new file mode 100644
index 0000000..4344810
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Deserialization schema from JSON to {@link Row}.
+ *
+ * <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
+ * the specified fields.
+ *
+ * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
+
+	/** Field names to parse. Indices match fieldTypes indices. */
+	private final String[] fieldNames;
+
+	/** Types to parse fields as. Indices match fieldNames indices. */
+	private final TypeInformation<?>[] fieldTypes;
+
+	/** Object mapper for parsing the JSON. */
+	private final ObjectMapper objectMapper = new ObjectMapper();
+
+	/** Flag indicating whether to fail on a missing field. */
+	private boolean failOnMissingField;
+
+	/**
+	 * Creates a JSON deserialization schema for the given fields and type classes.
+	 *
+	 * @param fieldNames Names of JSON fields to parse.
+	 * @param fieldTypes Type classes to parse JSON fields as.
+	 */
+	public JsonRowDeserializationSchema(String[] fieldNames, Class<?>[] fieldTypes) {
+		this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
+
+		this.fieldTypes = new TypeInformation[fieldTypes.length];
+		for (int i = 0; i < fieldTypes.length; i++) {
+			this.fieldTypes[i] = TypeExtractor.getForClass(fieldTypes[i]);
+		}
+
+		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
+				"Number of provided field names and types does not match.");
+	}
+
+	/**
+	 * Creates a JSON deserialization schema for the given fields and types.
+	 *
+	 * @param fieldNames Names of JSON fields to parse.
+	 * @param fieldTypes Types to parse JSON fields as.
+	 */
+	public JsonRowDeserializationSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
+		this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
+
+		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
+				"Number of provided field names and types does not match.");
+	}
+
+	@Override
+	public Row deserialize(byte[] message) throws IOException {
+		try {
+			JsonNode root = objectMapper.readTree(message);
+
+			Row row = new Row(fieldNames.length);
+			for (int i = 0; i < fieldNames.length; i++) {
+				JsonNode node = root.get(fieldNames[i]);
+
+				if (node == null) {
+					if (failOnMissingField) {
+						throw new IllegalStateException("Failed to find field with name '"
+								+ fieldNames[i] + "'.");
+					} else {
+						row.setField(i, null);
+					}
+				} else {
+					// Read the value as specified type
+					Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
+					row.setField(i, value);
+				}
+			}
+
+			return row;
+		} catch (Throwable t) {
+			throw new IOException("Failed to deserialize JSON object.", t);
+		}
+	}
+
+	@Override
+	public boolean isEndOfStream(Row nextElement) {
+		return false;
+	}
+
+	@Override
+	public TypeInformation<Row> getProducedType() {
+		return new RowTypeInfo(fieldTypes);
+	}
+
+	/**
+	 * Configures the failure behaviour if a JSON field is missing.
+	 *
+	 * <p>By default, a missing field is ignored and the field is set to null.
+	 *
+	 * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
+	 */
+	public void setFailOnMissingField(boolean failOnMissingField) {
+		this.failOnMissingField = failOnMissingField;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
new file mode 100644
index 0000000..077ff13
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.util.Preconditions;
+
+
+/**
+ * Serialization schema that serializes an object into a JSON bytes.
+ *
+ * <p>Serializes the input {@link Row} object into a JSON string and
+ * converts it into <code>byte[]</code>.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using
+ * {@link JsonRowDeserializationSchema}.
+ */
+public class JsonRowSerializationSchema implements SerializationSchema<Row> {
+	/** Fields names in the input Row object */
+	private final String[] fieldNames;
+	/** Object mapper that is used to create output JSON objects */
+	private static ObjectMapper mapper = new ObjectMapper();
+
+	/**
+	 * Creates a JSON serialization schema for the given fields and types.
+	 *
+	 * @param fieldNames Names of JSON fields to parse.
+	 */
+	public JsonRowSerializationSchema(String[] fieldNames) {
+		this.fieldNames = Preconditions.checkNotNull(fieldNames);
+	}
+
+	@Override
+	public byte[] serialize(Row row) {
+		if (row.productArity() != fieldNames.length) {
+			throw new IllegalStateException(String.format(
+				"Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
+		}
+
+		ObjectNode objectNode = mapper.createObjectNode();
+
+		for (int i = 0; i < row.productArity(); i++) {
+			JsonNode node = mapper.valueToTree(row.productElement(i));
+			objectNode.set(fieldNames[i], node);
+		}
+
+		try {
+			return mapper.writeValueAsBytes(objectNode);
+		} catch (Exception e) {
+			throw new RuntimeException("Failed to serialize row", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
new file mode 100644
index 0000000..01e72ca
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The deserialization schema describes how to turn the byte key / value messages delivered by certain
+ * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
+ * processed by Flink.
+ * 
+ * @param <T> The type created by the keyed deserialization schema.
+ */
+public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Deserializes the byte message.
+	 *
+	 * @param messageKey the key as a byte array (null if no key has been set)
+	 * @param message The message, as a byte array. (null if the message was empty or deleted)
+	 * @param partition The partition the message has originated from
+	 * @param offset the offset of the message in the original source (for example the Kafka offset)  @return The deserialized message as an object.
+	 */
+	T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
+
+	/**
+	 * Method to decide whether the element signals the end of the stream. If
+	 * true is returned the element won't be emitted.
+	 * 
+	 * @param nextElement The element to test for the end-of-stream signal.
+	 * @return True, if the element signals end of stream, false otherwise.
+	 */
+	boolean isEndOfStream(T nextElement);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
new file mode 100644
index 0000000..4b9dba2
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+/**
+ * A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema
+ * interface
+ * @param <T> The type created by the deserialization schema.
+ */
+public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {
+
+	private static final long serialVersionUID = 2651665280744549932L;
+
+	private final DeserializationSchema<T> deserializationSchema;
+
+	public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
+		this.deserializationSchema = deserializationSchema;
+	}
+	@Override
+	public T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+		return deserializationSchema.deserialize(message);
+	}
+
+	@Override
+	public boolean isEndOfStream(T nextElement) {
+		return deserializationSchema.isEndOfStream(nextElement);
+	}
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return deserializationSchema.getProducedType();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
new file mode 100644
index 0000000..701281e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+/**
+ * The serialization schema describes how to turn a data object into a different serialized
+ * representation. Most data sinks (for example Apache Kafka) require the data to be handed
+ * to them in a specific format (for example as byte strings).
+ * 
+ * @param <T> The type to be serialized.
+ */
+public interface KeyedSerializationSchema<T> extends Serializable {
+
+	/**
+	 * Serializes the key of the incoming element to a byte array
+	 * This method might return null if no key is available.
+	 *
+	 * @param element The incoming element to be serialized
+	 * @return the key of the element as a byte array
+	 */
+	byte[] serializeKey(T element);
+
+
+	/**
+	 * Serializes the value of the incoming element to a byte array
+	 * 
+	 * @param element The incoming element to be serialized
+	 * @return the value of the element as a byte array
+	 */
+	byte[] serializeValue(T element);
+
+	/**
+	 * Optional method to determine the target topic for the element
+	 *
+	 * @param element Incoming element to determine the target topic from
+	 * @return null or the target topic
+	 */
+	String getTargetTopic(T element);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
new file mode 100644
index 0000000..1b3e486
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+/**
+ * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema
+ * interface
+ * @param <T> The type to serialize
+ */
+public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {
+
+	private static final long serialVersionUID = 1351665280744549933L;
+
+	private final SerializationSchema<T> serializationSchema;
+
+	public KeyedSerializationSchemaWrapper(SerializationSchema<T> serializationSchema) {
+		this.serializationSchema = serializationSchema;
+	}
+
+	@Override
+	public byte[] serializeKey(T element) {
+		return null;
+	}
+
+	@Override
+	public byte[] serializeValue(T element) {
+		return serializationSchema.serialize(element);
+	}
+
+	@Override
+	public String getTargetTopic(T element) {
+		return null; // we are never overriding the topic
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
new file mode 100644
index 0000000..51bc8d1
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.IOException;
+
+/**
+ * A serialization and deserialization schema for Key Value Pairs that uses Flink's serialization stack to
+ * transform typed from and to byte arrays.
+ * 
+ * @param <K> The key type to be serialized.
+ * @param <V> The value type to be serialized.
+ */
+public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> {
+
+	private static final long serialVersionUID = -5359448468131559102L;
+
+	/** The serializer for the key */
+	private final TypeSerializer<K> keySerializer;
+
+	/** The serializer for the value */
+	private final TypeSerializer<V> valueSerializer;
+
+	/** reusable input deserialization buffer */
+	private final DataInputDeserializer inputDeserializer;
+	
+	/** reusable output serialization buffer for the key */
+	private transient DataOutputSerializer keyOutputSerializer;
+
+	/** reusable output serialization buffer for the value */
+	private transient DataOutputSerializer valueOutputSerializer;
+	
+	
+	/** The type information, to be returned by {@link #getProducedType()}. It is
+	 * transient, because it is not serializable. Note that this means that the type information
+	 * is not available at runtime, but only prior to the first serialization / deserialization */
+	private final transient TypeInformation<Tuple2<K, V>> typeInfo;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new de-/serialization schema for the given types.
+	 *
+	 * @param keyTypeInfo The type information for the key type de-/serialized by this schema.
+	 * @param valueTypeInfo The type information for the value type de-/serialized by this schema.
+	 * @param ec The execution config, which is used to parametrize the type serializers.
+	 */
+	public TypeInformationKeyValueSerializationSchema(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) {
+		this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo);
+		this.keySerializer = keyTypeInfo.createSerializer(ec);
+		this.valueSerializer = valueTypeInfo.createSerializer(ec);
+		this.inputDeserializer = new DataInputDeserializer();
+	}
+
+	/**
+	 * Creates a new de-/serialization schema for the given types. This constructor accepts the types
+	 * as classes and internally constructs the type information from the classes.
+	 * 
+	 * <p>If the types are parametrized and cannot be fully defined via classes, use the constructor
+	 * that accepts {@link TypeInformation} instead.
+	 * 
+	 * @param keyClass The class of the key de-/serialized by this schema.
+	 * @param valueClass The class of the value de-/serialized by this schema.
+	 * @param config The execution config, which is used to parametrize the type serializers.
+	 */
+	public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> valueClass, ExecutionConfig config) {
+		this(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass), config);
+	}
+
+	// ------------------------------------------------------------------------
+
+
+	@Override
+	public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+		K key = null;
+		V value = null;
+		
+		if (messageKey != null) {
+			inputDeserializer.setBuffer(messageKey, 0, messageKey.length);
+			key = keySerializer.deserialize(inputDeserializer);
+		}
+		if (message != null) {
+			inputDeserializer.setBuffer(message, 0, message.length);
+			value = valueSerializer.deserialize(inputDeserializer);
+		}
+		return new Tuple2<>(key, value);
+	}
+
+	/**
+	 * This schema never considers an element to signal end-of-stream, so this method returns always false.
+	 * @param nextElement The element to test for the end-of-stream signal.
+	 * @return Returns false.
+	 */
+	@Override
+	public boolean isEndOfStream(Tuple2<K,V> nextElement) {
+		return false;
+	}
+
+
+	@Override
+	public byte[] serializeKey(Tuple2<K, V> element) {
+		if (element.f0 == null) {
+			return null;
+		} else {
+			// key is not null. serialize it:
+			if (keyOutputSerializer == null) {
+				keyOutputSerializer = new DataOutputSerializer(16);
+			}
+			try {
+				keySerializer.serialize(element.f0, keyOutputSerializer);
+			}
+			catch (IOException e) {
+				throw new RuntimeException("Unable to serialize record", e);
+			}
+			// check if key byte array size changed
+			byte[] res = keyOutputSerializer.getByteArray();
+			if (res.length != keyOutputSerializer.length()) {
+				byte[] n = new byte[keyOutputSerializer.length()];
+				System.arraycopy(res, 0, n, 0, keyOutputSerializer.length());
+				res = n;
+			}
+			keyOutputSerializer.clear();
+			return res;
+		}
+	}
+
+	@Override
+	public byte[] serializeValue(Tuple2<K, V> element) {
+		// if the value is null, its serialized value is null as well.
+		if (element.f1 == null) {
+			return null;
+		}
+
+		if (valueOutputSerializer == null) {
+			valueOutputSerializer = new DataOutputSerializer(16);
+		}
+
+		try {
+			valueSerializer.serialize(element.f1, valueOutputSerializer);
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Unable to serialize record", e);
+		}
+
+		byte[] res = valueOutputSerializer.getByteArray();
+		if (res.length != valueOutputSerializer.length()) {
+			byte[] n = new byte[valueOutputSerializer.length()];
+			System.arraycopy(res, 0, n, 0, valueOutputSerializer.length());
+			res = n;
+		}
+		valueOutputSerializer.clear();
+		return res;
+	}
+
+	@Override
+	public String getTargetTopic(Tuple2<K, V> element) {
+		return null; // we are never overriding the topic
+	}
+
+
+	@Override
+	public TypeInformation<Tuple2<K,V>> getProducedType() {
+		if (typeInfo != null) {
+			return typeInfo;
+		}
+		else {
+			throw new IllegalStateException(
+					"The type information is not available after this class has been serialized and distributed.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
new file mode 100644
index 0000000..b96ba30
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class FlinkKafkaConsumerBaseTest {
+
+	/**
+	 * Tests that not both types of timestamp extractors / watermark generators can be used.
+	 */
+	@Test
+	public void testEitherWatermarkExtractor() {
+		try {
+			new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null);
+			fail();
+		} catch (NullPointerException ignored) {}
+
+		try {
+			new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null);
+			fail();
+		} catch (NullPointerException ignored) {}
+		
+		@SuppressWarnings("unchecked")
+		final AssignerWithPeriodicWatermarks<String> periodicAssigner = mock(AssignerWithPeriodicWatermarks.class);
+		@SuppressWarnings("unchecked")
+		final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
+		
+		DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>();
+		c1.assignTimestampsAndWatermarks(periodicAssigner);
+		try {
+			c1.assignTimestampsAndWatermarks(punctuatedAssigner);
+			fail();
+		} catch (IllegalStateException ignored) {}
+
+		DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>();
+		c2.assignTimestampsAndWatermarks(punctuatedAssigner);
+		try {
+			c2.assignTimestampsAndWatermarks(periodicAssigner);
+			fail();
+		} catch (IllegalStateException ignored) {}
+	}
+
+	/**
+	 * Tests that no checkpoints happen when the fetcher is not running.
+	 */
+	@Test
+	public void ignoreCheckpointWhenNotRunning() throws Exception {
+		@SuppressWarnings("unchecked")
+		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+
+		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, new LinkedMap(), false);
+		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+		TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
+		when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+
+		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));
+
+		assertFalse(listState.get().iterator().hasNext());
+		consumer.notifyCheckpointComplete(66L);
+	}
+
+	/**
+	 * Tests that no checkpoints happen when the fetcher is not running.
+	 */
+	@Test
+	public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception {
+		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+
+		TestingListState<Serializable> listState = new TestingListState<>();
+		listState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), 16768L));
+		listState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 987654321L));
+
+		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
+
+		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+
+		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
+
+		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+		when(initializationContext.isRestored()).thenReturn(true);
+
+		consumer.initializeState(initializationContext);
+
+		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17));
+
+		// ensure that the list was cleared and refilled. while this is an implementation detail, we use it here
+		// to figure out that snapshotState() actually did something.
+		Assert.assertTrue(listState.isClearCalled());
+
+		Set<Serializable> expected = new HashSet<>();
+
+		for (Serializable serializable : listState.get()) {
+			expected.add(serializable);
+		}
+
+		int counter = 0;
+
+		for (Serializable serializable : listState.get()) {
+			assertTrue(expected.contains(serializable));
+			counter++;
+		}
+
+		assertEquals(expected.size(), counter);
+	}
+
+	/**
+	 * Tests that no checkpoints happen when the fetcher is not running.
+	 */
+	@Test
+	public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception {
+		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
+
+		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+		TestingListState<Serializable> listState = new TestingListState<>();
+		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+
+		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
+
+		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+		when(initializationContext.isRestored()).thenReturn(false);
+
+		consumer.initializeState(initializationContext);
+
+		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17));
+
+		assertFalse(listState.get().iterator().hasNext());
+	}
+
+	/**
+	 * Tests that on snapshots, states and offsets to commit to Kafka are correct
+	 */
+	@Test
+	public void checkUseFetcherWhenNoCheckpoint() throws Exception {
+
+		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
+		List<KafkaTopicPartition> partitionList = new ArrayList<>(1);
+		partitionList.add(new KafkaTopicPartition("test", 0));
+		consumer.setSubscribedPartitions(partitionList);
+
+		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+		TestingListState<Serializable> listState = new TestingListState<>();
+		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+
+		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
+
+		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+
+		// make the context signal that there is no restored state, then validate that
+		when(initializationContext.isRestored()).thenReturn(false);
+		consumer.initializeState(initializationContext);
+		consumer.run(mock(SourceFunction.SourceContext.class));
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testSnapshotState() throws Exception {
+
+		// --------------------------------------------------------------------
+		//   prepare fake states
+		// --------------------------------------------------------------------
+
+		final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>();
+		state1.put(new KafkaTopicPartition("abc", 13), 16768L);
+		state1.put(new KafkaTopicPartition("def", 7), 987654321L);
+
+		final HashMap<KafkaTopicPartition, Long> state2 = new HashMap<>();
+		state2.put(new KafkaTopicPartition("abc", 13), 16770L);
+		state2.put(new KafkaTopicPartition("def", 7), 987654329L);
+
+		final HashMap<KafkaTopicPartition, Long> state3 = new HashMap<>();
+		state3.put(new KafkaTopicPartition("abc", 13), 16780L);
+		state3.put(new KafkaTopicPartition("def", 7), 987654377L);
+
+		// --------------------------------------------------------------------
+		
+		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
+			
+		final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+	
+		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingOffsetsToCommit, true);
+		assertEquals(0, pendingOffsetsToCommit.size());
+
+		OperatorStateStore backend = mock(OperatorStateStore.class);
+
+		TestingListState<Serializable> listState = new TestingListState<>();
+
+		when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+
+		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
+
+		when(initializationContext.getOperatorStateStore()).thenReturn(backend);
+		when(initializationContext.isRestored()).thenReturn(false, true, true, true);
+
+		consumer.initializeState(initializationContext);
+
+		// checkpoint 1
+		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138));
+
+		HashMap<KafkaTopicPartition, Long> snapshot1 = new HashMap<>();
+
+		for (Serializable serializable : listState.get()) {
+			Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
+			snapshot1.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
+		}
+
+		assertEquals(state1, snapshot1);
+		assertEquals(1, pendingOffsetsToCommit.size());
+		assertEquals(state1, pendingOffsetsToCommit.get(138L));
+
+		// checkpoint 2
+		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(140, 140));
+
+		HashMap<KafkaTopicPartition, Long> snapshot2 = new HashMap<>();
+
+		for (Serializable serializable : listState.get()) {
+			Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
+			snapshot2.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
+		}
+
+		assertEquals(state2, snapshot2);
+		assertEquals(2, pendingOffsetsToCommit.size());
+		assertEquals(state2, pendingOffsetsToCommit.get(140L));
+		
+		// ack checkpoint 1
+		consumer.notifyCheckpointComplete(138L);
+		assertEquals(1, pendingOffsetsToCommit.size());
+		assertTrue(pendingOffsetsToCommit.containsKey(140L));
+
+		// checkpoint 3
+		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
+
+		HashMap<KafkaTopicPartition, Long> snapshot3 = new HashMap<>();
+
+		for (Serializable serializable : listState.get()) {
+			Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
+			snapshot3.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
+		}
+
+		assertEquals(state3, snapshot3);
+		assertEquals(2, pendingOffsetsToCommit.size());
+		assertEquals(state3, pendingOffsetsToCommit.get(141L));
+		
+		// ack checkpoint 3, subsumes number 2
+		consumer.notifyCheckpointComplete(141L);
+		assertEquals(0, pendingOffsetsToCommit.size());
+
+
+		consumer.notifyCheckpointComplete(666); // invalid checkpoint
+		assertEquals(0, pendingOffsetsToCommit.size());
+
+		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+		listState = new TestingListState<>();
+		when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+
+		// create 500 snapshots
+		for (int i = 100; i < 600; i++) {
+			consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i));
+			listState.clear();
+		}
+		assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingOffsetsToCommit.size());
+
+		// commit only the second last
+		consumer.notifyCheckpointComplete(598);
+		assertEquals(1, pendingOffsetsToCommit.size());
+
+		// access invalid checkpoint
+		consumer.notifyCheckpointComplete(590);
+
+		// and the last
+		consumer.notifyCheckpointComplete(599);
+		assertEquals(0, pendingOffsetsToCommit.size());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static <T> FlinkKafkaConsumerBase<T> getConsumer(
+			AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception
+	{
+		FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>();
+
+		Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
+		fetcherField.setAccessible(true);
+		fetcherField.set(consumer, fetcher);
+
+		Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingOffsetsToCommit");
+		mapField.setAccessible(true);
+		mapField.set(consumer, pendingOffsetsToCommit);
+
+		Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running");
+		runningField.setAccessible(true);
+		runningField.set(consumer, running);
+
+		return consumer;
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
+		private static final long serialVersionUID = 1L;
+
+		@SuppressWarnings("unchecked")
+		public DummyFlinkKafkaConsumer() {
+			super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
+		}
+
+		@Override
+		protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception {
+			AbstractFetcher<T, ?> fetcher = mock(AbstractFetcher.class);
+			doAnswer(new Answer() {
+				@Override
+				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+					Assert.fail("Trying to restore offsets even though there was no restore state.");
+					return null;
+				}
+			}).when(fetcher).restoreOffsets(any(HashMap.class));
+			return fetcher;
+		}
+
+		@Override
+		protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
+			return Collections.emptyList();
+		}
+
+		@Override
+		public RuntimeContext getRuntimeContext() {
+			return mock(StreamingRuntimeContext.class);
+		}
+	}
+
+	private static final class TestingListState<T> implements ListState<T> {
+
+		private final List<T> list = new ArrayList<>();
+		private boolean clearCalled = false;
+
+		@Override
+		public void clear() {
+			list.clear();
+			clearCalled = true;
+		}
+
+		@Override
+		public Iterable<T> get() throws Exception {
+			return list;
+		}
+
+		@Override
+		public void add(T value) throws Exception {
+			list.add(value);
+		}
+
+		public List<T> getList() {
+			return list;
+		}
+
+		public boolean isClearCalled() {
+			return clearCalled;
+		}
+	}
+}