You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:22 UTC
[32/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
new file mode 100644
index 0000000..c68fe28
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink's description of a partition in a Kafka topic.
+ * Serializable, and common across all Kafka consumer subclasses (0.8, 0.9, ...)
+ *
+ * <p>Note: This class must not change in its structure, because it would change the
+ * serialization format and make previous savepoints unreadable.
+ */
+public final class KafkaTopicPartition implements Serializable {
+
+ /** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK
+ * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS */
+ private static final long serialVersionUID = 722083576322742325L;
+
+ // ------------------------------------------------------------------------
+
+ private final String topic;
+ private final int partition;
+ private final int cachedHash;
+
+ public KafkaTopicPartition(String topic, int partition) {
+ this.topic = requireNonNull(topic);
+ this.partition = partition;
+ this.cachedHash = 31 * topic.hashCode() + partition;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "KafkaTopicPartition{" +
+ "topic='" + topic + '\'' +
+ ", partition=" + partition +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ else if (o instanceof KafkaTopicPartition) {
+ KafkaTopicPartition that = (KafkaTopicPartition) o;
+ return this.partition == that.partition && this.topic.equals(that.topic);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return cachedHash;
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ public static String toString(Map<KafkaTopicPartition, Long> map) {
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<KafkaTopicPartition, Long> p: map.entrySet()) {
+ KafkaTopicPartition ktp = p.getKey();
+ sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(", ");
+ }
+ return sb.toString();
+ }
+
+ public static String toString(List<KafkaTopicPartition> partitions) {
+ StringBuilder sb = new StringBuilder();
+ for (KafkaTopicPartition p: partitions) {
+ sb.append(p.getTopic()).append(":").append(p.getPartition()).append(", ");
+ }
+ return sb.toString();
+ }
+
+
+ public static List<KafkaTopicPartition> dropLeaderData(List<KafkaTopicPartitionLeader> partitionInfos) {
+ List<KafkaTopicPartition> ret = new ArrayList<>(partitionInfos.size());
+ for(KafkaTopicPartitionLeader ktpl: partitionInfos) {
+ ret.add(ktpl.getTopicPartition());
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
new file mode 100644
index 0000000..1959a05
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.kafka.common.Node;
+
+import java.io.Serializable;
+
+/**
+ * Serializable Topic Partition info with leader Node information.
+ * This class is used at runtime.
+ */
+public class KafkaTopicPartitionLeader implements Serializable {
+
+ private static final long serialVersionUID = 9145855900303748582L;
+
+ private final int leaderId;
+ private final int leaderPort;
+ private final String leaderHost;
+ private final KafkaTopicPartition topicPartition;
+ private final int cachedHash;
+
+ public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, Node leader) {
+ this.topicPartition = topicPartition;
+ if (leader == null) {
+ this.leaderId = -1;
+ this.leaderHost = null;
+ this.leaderPort = -1;
+ } else {
+ this.leaderId = leader.id();
+ this.leaderPort = leader.port();
+ this.leaderHost = leader.host();
+ }
+ int cachedHash = (leader == null) ? 14 : leader.hashCode();
+ this.cachedHash = 31 * cachedHash + topicPartition.hashCode();
+ }
+
+ public KafkaTopicPartition getTopicPartition() {
+ return topicPartition;
+ }
+
+ public Node getLeader() {
+ if (this.leaderId == -1) {
+ return null;
+ } else {
+ return new Node(leaderId, leaderHost, leaderPort);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof KafkaTopicPartitionLeader)) {
+ return false;
+ }
+
+ KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o;
+
+ if (!topicPartition.equals(that.topicPartition)) {
+ return false;
+ }
+ return leaderId == that.leaderId && leaderPort == that.leaderPort && leaderHost.equals(that.leaderHost);
+ }
+
+ @Override
+ public int hashCode() {
+ return cachedHash;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaTopicPartitionLeader{" +
+ "leaderId=" + leaderId +
+ ", leaderPort=" + leaderPort +
+ ", leaderHost='" + leaderHost + '\'' +
+ ", topic=" + topicPartition.getTopic() +
+ ", partition=" + topicPartition.getPartition() +
+ '}';
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
new file mode 100644
index 0000000..7cb5f46
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * The state that the Flink Kafka Consumer holds for each Kafka partition.
+ * Includes the Kafka descriptor for partitions.
+ *
+ * <p>This class describes the most basic state (only the offset), subclasses
+ * define more elaborate state, containing current watermarks and timestamp
+ * extractors.
+ *
+ * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
+ */
+public class KafkaTopicPartitionState<KPH> {
+
+ /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
+ * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
+ public static final long OFFSET_NOT_SET = -915623761776L;
+
+ // ------------------------------------------------------------------------
+
+ /** The Flink description of a Kafka partition */
+ private final KafkaTopicPartition partition;
+
+ /** The Kafka description of a Kafka partition (varies across different Kafka versions) */
+ private final KPH kafkaPartitionHandle;
+
+ /** The offset within the Kafka partition that we already processed */
+ private volatile long offset;
+
+ /** The offset of the Kafka partition that has been committed */
+ private volatile long committedOffset;
+
+ // ------------------------------------------------------------------------
+
+ public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) {
+ this.partition = partition;
+ this.kafkaPartitionHandle = kafkaPartitionHandle;
+ this.offset = OFFSET_NOT_SET;
+ this.committedOffset = OFFSET_NOT_SET;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets Flink's descriptor for the Kafka Partition.
+ * @return The Flink partition descriptor.
+ */
+ public final KafkaTopicPartition getKafkaTopicPartition() {
+ return partition;
+ }
+
+ /**
+ * Gets Kafka's descriptor for the Kafka Partition.
+ * @return The Kafka partition descriptor.
+ */
+ public final KPH getKafkaPartitionHandle() {
+ return kafkaPartitionHandle;
+ }
+
+ public final String getTopic() {
+ return partition.getTopic();
+ }
+
+ public final int getPartition() {
+ return partition.getPartition();
+ }
+
+ /**
+ * The current offset in the partition. This refers to the offset last element that
+ * we retrieved and emitted successfully. It is the offset that should be stored in
+ * a checkpoint.
+ */
+ public final long getOffset() {
+ return offset;
+ }
+
+ public final void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public final boolean isOffsetDefined() {
+ return offset != OFFSET_NOT_SET;
+ }
+
+ public final void setCommittedOffset(long offset) {
+ this.committedOffset = offset;
+ }
+
+ public final long getCommittedOffset() {
+ return committedOffset;
+ }
+
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "Partition: " + partition + ", KafkaPartitionHandle=" + kafkaPartitionHandle
+ + ", offset=" + (isOffsetDefined() ? String.valueOf(offset) : "(not set)");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
new file mode 100644
index 0000000..efdc73f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * A special version of the per-kafka-partition-state that additionally holds
+ * a periodic watermark generator (and timestamp extractor) per partition.
+ *
+ * @param <T> The type of records handled by the watermark generator
+ * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
+ */
+public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
+
+ /** The timestamp assigner and watermark generator for the partition */
+ private final AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks;
+
+ /** The last watermark timestamp generated by this partition */
+ private long partitionWatermark;
+
+ // ------------------------------------------------------------------------
+
+ public KafkaTopicPartitionStateWithPeriodicWatermarks(
+ KafkaTopicPartition partition, KPH kafkaPartitionHandle,
+ AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks)
+ {
+ super(partition, kafkaPartitionHandle);
+
+ this.timestampsAndWatermarks = timestampsAndWatermarks;
+ this.partitionWatermark = Long.MIN_VALUE;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
+ return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
+ }
+
+ public long getCurrentWatermarkTimestamp() {
+ Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
+ if (wm != null) {
+ partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp());
+ }
+ return partitionWatermark;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition()
+ + ", offset=" + getOffset() + ", watermark=" + partitionWatermark;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
new file mode 100644
index 0000000..edf40ce
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import javax.annotation.Nullable;
+
+/**
+ * A special version of the per-kafka-partition-state that additionally holds
+ * a periodic watermark generator (and timestamp extractor) per partition.
+ *
+ * <p>This class is not thread safe, but it gives volatile access to the current
+ * partition watermark ({@link #getCurrentPartitionWatermark()}).
+ *
+ * @param <T> The type of records handled by the watermark generator
+ * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions
+ */
+public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
+
+ /** The timestamp assigner and watermark generator for the partition */
+ private final AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks;
+
+ /** The last watermark timestamp generated by this partition */
+ private volatile long partitionWatermark;
+
+ // ------------------------------------------------------------------------
+
+ public KafkaTopicPartitionStateWithPunctuatedWatermarks(
+ KafkaTopicPartition partition, KPH kafkaPartitionHandle,
+ AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks)
+ {
+ super(partition, kafkaPartitionHandle);
+
+ this.timestampsAndWatermarks = timestampsAndWatermarks;
+ this.partitionWatermark = Long.MIN_VALUE;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
+ return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
+ }
+
+ @Nullable
+ public Watermark checkAndGetNewWatermark(T record, long timestamp) {
+ Watermark mark = timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp);
+ if (mark != null && mark.getTimestamp() > partitionWatermark) {
+ partitionWatermark = mark.getTimestamp();
+ return mark;
+ }
+ else {
+ return null;
+ }
+ }
+
+ public long getCurrentPartitionWatermark() {
+ return partitionWatermark;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "KafkaTopicPartitionStateWithPunctuatedWatermarks: partition=" + getKafkaTopicPartition()
+ + ", offset=" + getOffset() + ", watermark=" + partitionWatermark;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
new file mode 100644
index 0000000..7a41ade
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+public class TypeUtil {
+ private TypeUtil() {}
+
+ /**
+ * Creates TypeInformation array for an array of Classes.
+ * @param fieldTypes classes to extract type information from
+ * @return type information
+ */
+ public static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) {
+ TypeInformation<?>[] typeInfos = new TypeInformation[fieldTypes.length];
+ for (int i = 0; i < fieldTypes.length; i++) {
+ typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]);
+ }
+ return typeInfos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
new file mode 100644
index 0000000..cedb696
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+/**
+ * Gauge for getting the current value of a Kafka metric.
+ */
+public class KafkaMetricWrapper implements Gauge<Double> {
+ private final org.apache.kafka.common.Metric kafkaMetric;
+
+ public KafkaMetricWrapper(org.apache.kafka.common.Metric metric) {
+ this.kafkaMetric = metric;
+ }
+
+ @Override
+ public Double getValue() {
+ return kafkaMetric.value();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
new file mode 100644
index 0000000..9b848e0
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ * # More Flink partitions than kafka partitions
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 --------------/
+ * 3 -------------/
+ * 4 ------------/
+ * </pre>
+ * Some (or all) kafka partitions contain the output of more than one flink partition
+ *
+ *# Fewer Flink partitions than Kafka
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 ----------------> 2
+ * 3
+ * 4
+ * 5
+ * </pre>
+ *
+ * Not all Kafka partitions contain data
+ * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
+ * cause a lot of network connections between all the Flink instances and all the Kafka brokers
+ *
+ *
+ */
+public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {
+ private static final long serialVersionUID = 1627268846962918126L;
+
+ private int targetPartition = -1;
+
+ @Override
+ public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
+ if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) {
+ throw new IllegalArgumentException();
+ }
+
+ this.targetPartition = partitions[parallelInstanceId % partitions.length];
+ }
+
+ @Override
+ public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+ if (targetPartition >= 0) {
+ return targetPartition;
+ } else {
+ throw new RuntimeException("The partitioner has not been initialized properly");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
new file mode 100644
index 0000000..37e2ef6
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * It contains a open() method which is called on each parallel instance.
+ * Partitioners must be serializable!
+ */
+public abstract class KafkaPartitioner<T> implements Serializable {
+
+ private static final long serialVersionUID = -1974260817778593473L;
+
+ /**
+ * Initializer for the Partitioner.
+ * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
+ * @param parallelInstances the total number of parallel instances
+ * @param partitions an array describing the partition IDs of the available Kafka partitions.
+ */
+ public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
+ // overwrite this method if needed.
+ }
+
+ public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
new file mode 100644
index 0000000..d170058
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+
+
+/**
+ * DeserializationSchema that deserializes a JSON String into an ObjectNode.
+ * <p>
+ * Fields can be accessed by calling objectNode.get(<name>).as(<type>)
+ */
+public class JSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
+ private ObjectMapper mapper;
+
+ @Override
+ public ObjectNode deserialize(byte[] message) throws IOException {
+ if (mapper == null) {
+ mapper = new ObjectMapper();
+ }
+ return mapper.readValue(message, ObjectNode.class);
+ }
+
+ @Override
+ public boolean isEndOfStream(ObjectNode nextElement) {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
new file mode 100644
index 0000000..261a111
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
+
+/**
+ * DeserializationSchema that deserializes a JSON String into an ObjectNode.
+ * <p>
+ * Key fields can be accessed by calling objectNode.get("key").get(<name>).as(<type>)
+ * <p>
+ * Value fields can be accessed by calling objectNode.get("value").get(<name>).as(<type>)
+ * <p>
+ * Metadata fields can be accessed by calling objectNode.get("metadata").get(<name>).as(<type>) and include
+ * the "offset" (long), "topic" (String) and "partition" (int).
+ */
+public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> {
+ private final boolean includeMetadata;
+ private ObjectMapper mapper;
+
+ public JSONKeyValueDeserializationSchema(boolean includeMetadata) {
+ this.includeMetadata = includeMetadata;
+ }
+
+ @Override
+ public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+ if (mapper == null) {
+ mapper = new ObjectMapper();
+ }
+ ObjectNode node = mapper.createObjectNode();
+ node.set("key", mapper.readValue(messageKey, JsonNode.class));
+ node.set("value", mapper.readValue(message, JsonNode.class));
+ if (includeMetadata) {
+ node.putObject("metadata")
+ .put("offset", offset)
+ .put("topic", topic)
+ .put("partition", partition);
+ }
+ return node;
+ }
+
+ @Override
+ public boolean isEndOfStream(ObjectNode nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<ObjectNode> getProducedType() {
+ return getForClass(ObjectNode.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
new file mode 100644
index 0000000..4344810
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Deserialization schema from JSON to {@link Row}.
+ *
+ * <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
+ * the specified fields.
+ *
+ * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
+
+ /** Field names to parse. Indices match fieldTypes indices. */
+ private final String[] fieldNames;
+
+ /** Types to parse fields as. Indices match fieldNames indices. */
+ private final TypeInformation<?>[] fieldTypes;
+
+ /** Object mapper for parsing the JSON. */
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ /** Flag indicating whether to fail on a missing field. */
+ private boolean failOnMissingField;
+
+ /**
+ * Creates a JSON deserialization schema for the given fields and type classes.
+ *
+ * @param fieldNames Names of JSON fields to parse.
+ * @param fieldTypes Type classes to parse JSON fields as.
+ */
+ public JsonRowDeserializationSchema(String[] fieldNames, Class<?>[] fieldTypes) {
+ this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
+
+ this.fieldTypes = new TypeInformation[fieldTypes.length];
+ for (int i = 0; i < fieldTypes.length; i++) {
+ this.fieldTypes[i] = TypeExtractor.getForClass(fieldTypes[i]);
+ }
+
+ Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
+ "Number of provided field names and types does not match.");
+ }
+
+ /**
+ * Creates a JSON deserialization schema for the given fields and types.
+ *
+ * @param fieldNames Names of JSON fields to parse.
+ * @param fieldTypes Types to parse JSON fields as.
+ */
+ public JsonRowDeserializationSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
+ this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
+
+ Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
+ "Number of provided field names and types does not match.");
+ }
+
+ @Override
+ public Row deserialize(byte[] message) throws IOException {
+ try {
+ JsonNode root = objectMapper.readTree(message);
+
+ Row row = new Row(fieldNames.length);
+ for (int i = 0; i < fieldNames.length; i++) {
+ JsonNode node = root.get(fieldNames[i]);
+
+ if (node == null) {
+ if (failOnMissingField) {
+ throw new IllegalStateException("Failed to find field with name '"
+ + fieldNames[i] + "'.");
+ } else {
+ row.setField(i, null);
+ }
+ } else {
+ // Read the value as specified type
+ Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
+ row.setField(i, value);
+ }
+ }
+
+ return row;
+ } catch (Throwable t) {
+ throw new IOException("Failed to deserialize JSON object.", t);
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(Row nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return new RowTypeInfo(fieldTypes);
+ }
+
+ /**
+ * Configures the failure behaviour if a JSON field is missing.
+ *
+ * <p>By default, a missing field is ignored and the field is set to null.
+ *
+ * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
+ */
+ public void setFailOnMissingField(boolean failOnMissingField) {
+ this.failOnMissingField = failOnMissingField;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
new file mode 100644
index 0000000..077ff13
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.util.Preconditions;
+
+
+/**
+ * Serialization schema that serializes an object into a JSON bytes.
+ *
+ * <p>Serializes the input {@link Row} object into a JSON string and
+ * converts it into <code>byte[]</code>.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using
+ * {@link JsonRowDeserializationSchema}.
+ */
+public class JsonRowSerializationSchema implements SerializationSchema<Row> {
+ /** Fields names in the input Row object */
+ private final String[] fieldNames;
+ /** Object mapper that is used to create output JSON objects */
+ private static ObjectMapper mapper = new ObjectMapper();
+
+ /**
+ * Creates a JSON serialization schema for the given fields and types.
+ *
+ * @param fieldNames Names of JSON fields to parse.
+ */
+ public JsonRowSerializationSchema(String[] fieldNames) {
+ this.fieldNames = Preconditions.checkNotNull(fieldNames);
+ }
+
+ @Override
+ public byte[] serialize(Row row) {
+ if (row.productArity() != fieldNames.length) {
+ throw new IllegalStateException(String.format(
+ "Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
+ }
+
+ ObjectNode objectNode = mapper.createObjectNode();
+
+ for (int i = 0; i < row.productArity(); i++) {
+ JsonNode node = mapper.valueToTree(row.productElement(i));
+ objectNode.set(fieldNames[i], node);
+ }
+
+ try {
+ return mapper.writeValueAsBytes(objectNode);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to serialize row", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
new file mode 100644
index 0000000..01e72ca
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The deserialization schema describes how to turn the byte key / value messages delivered by certain
+ * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
+ * processed by Flink.
+ *
+ * @param <T> The type created by the keyed deserialization schema.
+ */
+public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+ /**
+ * Deserializes the byte message.
+ *
+ * @param messageKey the key as a byte array (null if no key has been set)
+ * @param message The message, as a byte array. (null if the message was empty or deleted)
+ * @param partition The partition the message has originated from
+ * @param offset the offset of the message in the original source (for example the Kafka offset) @return The deserialized message as an object.
+ */
+ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
+
+ /**
+ * Method to decide whether the element signals the end of the stream. If
+ * true is returned the element won't be emitted.
+ *
+ * @param nextElement The element to test for the end-of-stream signal.
+ * @return True, if the element signals end of stream, false otherwise.
+ */
+ boolean isEndOfStream(T nextElement);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
new file mode 100644
index 0000000..4b9dba2
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+/**
+ * A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema
+ * interface
+ * @param <T> The type created by the deserialization schema.
+ */
+public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {
+
+ private static final long serialVersionUID = 2651665280744549932L;
+
+ private final DeserializationSchema<T> deserializationSchema;
+
+ public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
+ this.deserializationSchema = deserializationSchema;
+ }
+ @Override
+ public T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+ return deserializationSchema.deserialize(message);
+ }
+
+ @Override
+ public boolean isEndOfStream(T nextElement) {
+ return deserializationSchema.isEndOfStream(nextElement);
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
new file mode 100644
index 0000000..701281e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+/**
+ * The serialization schema describes how to turn a data object into a different serialized
+ * representation. Most data sinks (for example Apache Kafka) require the data to be handed
+ * to them in a specific format (for example as byte strings).
+ *
+ * @param <T> The type to be serialized.
+ */
+public interface KeyedSerializationSchema<T> extends Serializable {
+
+ /**
+ * Serializes the key of the incoming element to a byte array
+ * This method might return null if no key is available.
+ *
+ * @param element The incoming element to be serialized
+ * @return the key of the element as a byte array
+ */
+ byte[] serializeKey(T element);
+
+
+ /**
+ * Serializes the value of the incoming element to a byte array
+ *
+ * @param element The incoming element to be serialized
+ * @return the value of the element as a byte array
+ */
+ byte[] serializeValue(T element);
+
+ /**
+ * Optional method to determine the target topic for the element
+ *
+ * @param element Incoming element to determine the target topic from
+ * @return null or the target topic
+ */
+ String getTargetTopic(T element);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
new file mode 100644
index 0000000..1b3e486
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+/**
+ * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema
+ * interface
+ * @param <T> The type to serialize
+ */
+public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {
+
+ private static final long serialVersionUID = 1351665280744549933L;
+
+ private final SerializationSchema<T> serializationSchema;
+
+ public KeyedSerializationSchemaWrapper(SerializationSchema<T> serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ }
+
+ @Override
+ public byte[] serializeKey(T element) {
+ return null;
+ }
+
+ @Override
+ public byte[] serializeValue(T element) {
+ return serializationSchema.serialize(element);
+ }
+
+ @Override
+ public String getTargetTopic(T element) {
+ return null; // we are never overriding the topic
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
new file mode 100644
index 0000000..51bc8d1
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.IOException;
+
+/**
+ * A serialization and deserialization schema for Key Value Pairs that uses Flink's serialization stack to
+ * transform typed from and to byte arrays.
+ *
+ * @param <K> The key type to be serialized.
+ * @param <V> The value type to be serialized.
+ */
+public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> {
+
+ private static final long serialVersionUID = -5359448468131559102L;
+
+ /** The serializer for the key */
+ private final TypeSerializer<K> keySerializer;
+
+ /** The serializer for the value */
+ private final TypeSerializer<V> valueSerializer;
+
+ /** reusable input deserialization buffer */
+ private final DataInputDeserializer inputDeserializer;
+
+ /** reusable output serialization buffer for the key */
+ private transient DataOutputSerializer keyOutputSerializer;
+
+ /** reusable output serialization buffer for the value */
+ private transient DataOutputSerializer valueOutputSerializer;
+
+
+ /** The type information, to be returned by {@link #getProducedType()}. It is
+ * transient, because it is not serializable. Note that this means that the type information
+ * is not available at runtime, but only prior to the first serialization / deserialization */
+ private final transient TypeInformation<Tuple2<K, V>> typeInfo;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new de-/serialization schema for the given types.
+ *
+ * @param keyTypeInfo The type information for the key type de-/serialized by this schema.
+ * @param valueTypeInfo The type information for the value type de-/serialized by this schema.
+ * @param ec The execution config, which is used to parametrize the type serializers.
+ */
+ public TypeInformationKeyValueSerializationSchema(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) {
+ this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo);
+ this.keySerializer = keyTypeInfo.createSerializer(ec);
+ this.valueSerializer = valueTypeInfo.createSerializer(ec);
+ this.inputDeserializer = new DataInputDeserializer();
+ }
+
+ /**
+ * Creates a new de-/serialization schema for the given types. This constructor accepts the types
+ * as classes and internally constructs the type information from the classes.
+ *
+ * <p>If the types are parametrized and cannot be fully defined via classes, use the constructor
+ * that accepts {@link TypeInformation} instead.
+ *
+ * @param keyClass The class of the key de-/serialized by this schema.
+ * @param valueClass The class of the value de-/serialized by this schema.
+ * @param config The execution config, which is used to parametrize the type serializers.
+ */
+ public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> valueClass, ExecutionConfig config) {
+ this(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass), config);
+ }
+
+ // ------------------------------------------------------------------------
+
+
+ @Override
+ public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+ K key = null;
+ V value = null;
+
+ if (messageKey != null) {
+ inputDeserializer.setBuffer(messageKey, 0, messageKey.length);
+ key = keySerializer.deserialize(inputDeserializer);
+ }
+ if (message != null) {
+ inputDeserializer.setBuffer(message, 0, message.length);
+ value = valueSerializer.deserialize(inputDeserializer);
+ }
+ return new Tuple2<>(key, value);
+ }
+
+ /**
+ * This schema never considers an element to signal end-of-stream, so this method returns always false.
+ * @param nextElement The element to test for the end-of-stream signal.
+ * @return Returns false.
+ */
+ @Override
+ public boolean isEndOfStream(Tuple2<K,V> nextElement) {
+ return false;
+ }
+
+
+ @Override
+ public byte[] serializeKey(Tuple2<K, V> element) {
+ if (element.f0 == null) {
+ return null;
+ } else {
+ // key is not null. serialize it:
+ if (keyOutputSerializer == null) {
+ keyOutputSerializer = new DataOutputSerializer(16);
+ }
+ try {
+ keySerializer.serialize(element.f0, keyOutputSerializer);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Unable to serialize record", e);
+ }
+ // check if key byte array size changed
+ byte[] res = keyOutputSerializer.getByteArray();
+ if (res.length != keyOutputSerializer.length()) {
+ byte[] n = new byte[keyOutputSerializer.length()];
+ System.arraycopy(res, 0, n, 0, keyOutputSerializer.length());
+ res = n;
+ }
+ keyOutputSerializer.clear();
+ return res;
+ }
+ }
+
+ @Override
+ public byte[] serializeValue(Tuple2<K, V> element) {
+ // if the value is null, its serialized value is null as well.
+ if (element.f1 == null) {
+ return null;
+ }
+
+ if (valueOutputSerializer == null) {
+ valueOutputSerializer = new DataOutputSerializer(16);
+ }
+
+ try {
+ valueSerializer.serialize(element.f1, valueOutputSerializer);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Unable to serialize record", e);
+ }
+
+ byte[] res = valueOutputSerializer.getByteArray();
+ if (res.length != valueOutputSerializer.length()) {
+ byte[] n = new byte[valueOutputSerializer.length()];
+ System.arraycopy(res, 0, n, 0, valueOutputSerializer.length());
+ res = n;
+ }
+ valueOutputSerializer.clear();
+ return res;
+ }
+
+ @Override
+ public String getTargetTopic(Tuple2<K, V> element) {
+ return null; // we are never overriding the topic
+ }
+
+
+ @Override
+ public TypeInformation<Tuple2<K,V>> getProducedType() {
+ if (typeInfo != null) {
+ return typeInfo;
+ }
+ else {
+ throw new IllegalStateException(
+ "The type information is not available after this class has been serialized and distributed.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
new file mode 100644
index 0000000..b96ba30
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class FlinkKafkaConsumerBaseTest {
+
+ /**
+ * Tests that not both types of timestamp extractors / watermark generators can be used.
+ */
+ @Test
+ public void testEitherWatermarkExtractor() {
+ try {
+ new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null);
+ fail();
+ } catch (NullPointerException ignored) {}
+
+ try {
+ new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null);
+ fail();
+ } catch (NullPointerException ignored) {}
+
+ @SuppressWarnings("unchecked")
+ final AssignerWithPeriodicWatermarks<String> periodicAssigner = mock(AssignerWithPeriodicWatermarks.class);
+ @SuppressWarnings("unchecked")
+ final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
+
+ DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>();
+ c1.assignTimestampsAndWatermarks(periodicAssigner);
+ try {
+ c1.assignTimestampsAndWatermarks(punctuatedAssigner);
+ fail();
+ } catch (IllegalStateException ignored) {}
+
+ DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>();
+ c2.assignTimestampsAndWatermarks(punctuatedAssigner);
+ try {
+ c2.assignTimestampsAndWatermarks(periodicAssigner);
+ fail();
+ } catch (IllegalStateException ignored) {}
+ }
+
+ /**
+ * Tests that no checkpoints happen when the fetcher is not running.
+ */
+ @Test
+ public void ignoreCheckpointWhenNotRunning() throws Exception {
+ @SuppressWarnings("unchecked")
+ final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+
+ FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, new LinkedMap(), false);
+ OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+ TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
+ when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+
+ consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));
+
+ assertFalse(listState.get().iterator().hasNext());
+ consumer.notifyCheckpointComplete(66L);
+ }
+
+ /**
+ * Tests that no checkpoints happen when the fetcher is not running.
+ */
+ @Test
+ public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception {
+ OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+
+ TestingListState<Serializable> listState = new TestingListState<>();
+ listState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), 16768L));
+ listState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 987654321L));
+
+ FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
+
+ when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+
+ StateInitializationContext initializationContext = mock(StateInitializationContext.class);
+
+ when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+ when(initializationContext.isRestored()).thenReturn(true);
+
+ consumer.initializeState(initializationContext);
+
+ consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17));
+
+ // ensure that the list was cleared and refilled. while this is an implementation detail, we use it here
+ // to figure out that snapshotState() actually did something.
+ Assert.assertTrue(listState.isClearCalled());
+
+ Set<Serializable> expected = new HashSet<>();
+
+ for (Serializable serializable : listState.get()) {
+ expected.add(serializable);
+ }
+
+ int counter = 0;
+
+ for (Serializable serializable : listState.get()) {
+ assertTrue(expected.contains(serializable));
+ counter++;
+ }
+
+ assertEquals(expected.size(), counter);
+ }
+
+ /**
+ * Tests that no checkpoints happen when the fetcher is not running.
+ */
+ @Test
+ public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception {
+ FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
+
+ OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+ TestingListState<Serializable> listState = new TestingListState<>();
+ when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+
+ StateInitializationContext initializationContext = mock(StateInitializationContext.class);
+
+ when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+ when(initializationContext.isRestored()).thenReturn(false);
+
+ consumer.initializeState(initializationContext);
+
+ consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17));
+
+ assertFalse(listState.get().iterator().hasNext());
+ }
+
+ /**
+ * Tests that on snapshots, states and offsets to commit to Kafka are correct
+ */
+ @Test
+ public void checkUseFetcherWhenNoCheckpoint() throws Exception {
+
+ FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
+ List<KafkaTopicPartition> partitionList = new ArrayList<>(1);
+ partitionList.add(new KafkaTopicPartition("test", 0));
+ consumer.setSubscribedPartitions(partitionList);
+
+ OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+ TestingListState<Serializable> listState = new TestingListState<>();
+ when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+
+ StateInitializationContext initializationContext = mock(StateInitializationContext.class);
+
+ when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+
+ // make the context signal that there is no restored state, then validate that
+ when(initializationContext.isRestored()).thenReturn(false);
+ consumer.initializeState(initializationContext);
+ consumer.run(mock(SourceFunction.SourceContext.class));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSnapshotState() throws Exception {
+
+ // --------------------------------------------------------------------
+ // prepare fake states
+ // --------------------------------------------------------------------
+
+ final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>();
+ state1.put(new KafkaTopicPartition("abc", 13), 16768L);
+ state1.put(new KafkaTopicPartition("def", 7), 987654321L);
+
+ final HashMap<KafkaTopicPartition, Long> state2 = new HashMap<>();
+ state2.put(new KafkaTopicPartition("abc", 13), 16770L);
+ state2.put(new KafkaTopicPartition("def", 7), 987654329L);
+
+ final HashMap<KafkaTopicPartition, Long> state3 = new HashMap<>();
+ state3.put(new KafkaTopicPartition("abc", 13), 16780L);
+ state3.put(new KafkaTopicPartition("def", 7), 987654377L);
+
+ // --------------------------------------------------------------------
+
+ final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+ when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
+
+ final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+ FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingOffsetsToCommit, true);
+ assertEquals(0, pendingOffsetsToCommit.size());
+
+ OperatorStateStore backend = mock(OperatorStateStore.class);
+
+ TestingListState<Serializable> listState = new TestingListState<>();
+
+ when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+
+ StateInitializationContext initializationContext = mock(StateInitializationContext.class);
+
+ when(initializationContext.getOperatorStateStore()).thenReturn(backend);
+ when(initializationContext.isRestored()).thenReturn(false, true, true, true);
+
+ consumer.initializeState(initializationContext);
+
+ // checkpoint 1
+ consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138));
+
+ HashMap<KafkaTopicPartition, Long> snapshot1 = new HashMap<>();
+
+ for (Serializable serializable : listState.get()) {
+ Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
+ snapshot1.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
+ }
+
+ assertEquals(state1, snapshot1);
+ assertEquals(1, pendingOffsetsToCommit.size());
+ assertEquals(state1, pendingOffsetsToCommit.get(138L));
+
+ // checkpoint 2
+ consumer.snapshotState(new StateSnapshotContextSynchronousImpl(140, 140));
+
+ HashMap<KafkaTopicPartition, Long> snapshot2 = new HashMap<>();
+
+ for (Serializable serializable : listState.get()) {
+ Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
+ snapshot2.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
+ }
+
+ assertEquals(state2, snapshot2);
+ assertEquals(2, pendingOffsetsToCommit.size());
+ assertEquals(state2, pendingOffsetsToCommit.get(140L));
+
+ // ack checkpoint 1
+ consumer.notifyCheckpointComplete(138L);
+ assertEquals(1, pendingOffsetsToCommit.size());
+ assertTrue(pendingOffsetsToCommit.containsKey(140L));
+
+ // checkpoint 3
+ consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
+
+ HashMap<KafkaTopicPartition, Long> snapshot3 = new HashMap<>();
+
+ for (Serializable serializable : listState.get()) {
+ Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
+ snapshot3.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
+ }
+
+ assertEquals(state3, snapshot3);
+ assertEquals(2, pendingOffsetsToCommit.size());
+ assertEquals(state3, pendingOffsetsToCommit.get(141L));
+
+ // ack checkpoint 3, subsumes number 2
+ consumer.notifyCheckpointComplete(141L);
+ assertEquals(0, pendingOffsetsToCommit.size());
+
+
+ consumer.notifyCheckpointComplete(666); // invalid checkpoint
+ assertEquals(0, pendingOffsetsToCommit.size());
+
+ OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+ listState = new TestingListState<>();
+ when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+
+ // create 500 snapshots
+ for (int i = 100; i < 600; i++) {
+ consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i));
+ listState.clear();
+ }
+ assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingOffsetsToCommit.size());
+
+ // commit only the second last
+ consumer.notifyCheckpointComplete(598);
+ assertEquals(1, pendingOffsetsToCommit.size());
+
+ // access invalid checkpoint
+ consumer.notifyCheckpointComplete(590);
+
+ // and the last
+ consumer.notifyCheckpointComplete(599);
+ assertEquals(0, pendingOffsetsToCommit.size());
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static <T> FlinkKafkaConsumerBase<T> getConsumer(
+ AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception
+ {
+ FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>();
+
+ Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
+ fetcherField.setAccessible(true);
+ fetcherField.set(consumer, fetcher);
+
+ Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingOffsetsToCommit");
+ mapField.setAccessible(true);
+ mapField.set(consumer, pendingOffsetsToCommit);
+
+ Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running");
+ runningField.setAccessible(true);
+ runningField.set(consumer, running);
+
+ return consumer;
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
+ private static final long serialVersionUID = 1L;
+
+ @SuppressWarnings("unchecked")
+ public DummyFlinkKafkaConsumer() {
+ super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
+ }
+
+ @Override
+ protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception {
+ AbstractFetcher<T, ?> fetcher = mock(AbstractFetcher.class);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ Assert.fail("Trying to restore offsets even though there was no restore state.");
+ return null;
+ }
+ }).when(fetcher).restoreOffsets(any(HashMap.class));
+ return fetcher;
+ }
+
+ @Override
+ protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ return mock(StreamingRuntimeContext.class);
+ }
+ }
+
+ private static final class TestingListState<T> implements ListState<T> {
+
+ private final List<T> list = new ArrayList<>();
+ private boolean clearCalled = false;
+
+ @Override
+ public void clear() {
+ list.clear();
+ clearCalled = true;
+ }
+
+ @Override
+ public Iterable<T> get() throws Exception {
+ return list;
+ }
+
+ @Override
+ public void add(T value) throws Exception {
+ list.add(value);
+ }
+
+ public List<T> getList() {
+ return list;
+ }
+
+ public boolean isClearCalled() {
+ return clearCalled;
+ }
+ }
+}