You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:23 UTC
[33/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml
new file mode 100644
index 0000000..58eb043
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -0,0 +1,212 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.2-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-kafka-base_2.10</artifactId>
+ <name>flink-connector-kafka-base</name>
+
+ <packaging>jar</packaging>
+
+ <!-- Allow users to pass custom connector versions -->
+ <properties>
+ <kafka.version>0.8.2.2</kafka.version>
+ </properties>
+
+ <dependencies>
+
+ <!-- core dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <!-- Projects depending on this project,
+ won't depend on flink-table. -->
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>${kafka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.yammer.metrics</groupId>
+ <artifactId>metrics-annotation</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <!-- force using the latest zkclient -->
+ <dependency>
+ <groupId>com.101tec</groupId>
+ <artifactId>zkclient</artifactId>
+ <version>0.7</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${curator.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-jmx</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-tests_2.10</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.10</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minikdc</artifactId>
+ <version>${minikdc.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.101tec</groupId>
+ <artifactId>zkclient</artifactId>
+ <version>0.7</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <!--
+ https://issues.apache.org/jira/browse/DIRSHARED-134
+ Required to pull the Mini-KDC transitive dependency
+ -->
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>3.0.1</version>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
new file mode 100644
index 0000000..aef7116
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class of all Flink Kafka Consumer data sources.
+ * This implements the common behavior across all Kafka versions.
+ *
+ * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the
+ * {@link AbstractFetcher}.
+ *
+ * @param <T> The type of records produced by this data source
+ */
+public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
+ CheckpointListener,
+ ResultTypeQueryable<T>,
+ CheckpointedFunction {
+ private static final long serialVersionUID = -6272159445203409112L;
+
+ protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+
+ /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */
+ public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+ /** Boolean configuration key to disable metrics tracking **/
+ public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
+
+ // ------------------------------------------------------------------------
+ // configuration state, set on the client relevant for all subtasks
+ // ------------------------------------------------------------------------
+
+ private final List<String> topics;
+
+ /** The schema to convert between Kafka's byte messages, and Flink's objects */
+ protected final KeyedDeserializationSchema<T> deserializer;
+
+ /** The set of topic partitions that the source will read */
+ protected List<KafkaTopicPartition> subscribedPartitions;
+
+ /** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
+ * to exploit per-partition timestamp characteristics.
+ * The assigner is kept in serialized form, to deserialize it into multiple copies */
+ private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner;
+
+ /** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
+ * to exploit per-partition timestamp characteristics.
+ * The assigner is kept in serialized form, to deserialize it into multiple copies */
+ private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;
+
+ private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint;
+
+ // ------------------------------------------------------------------------
+ // runtime state (used individually by each parallel subtask)
+ // ------------------------------------------------------------------------
+
+ /** Data for pending but uncommitted offsets */
+ private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+ /** The fetcher implements the connections to the Kafka brokers */
+ private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
+
+ /** The offsets to restore to, if the consumer restores state from a checkpoint */
+ private transient volatile HashMap<KafkaTopicPartition, Long> restoreToOffset;
+
+ /** Flag indicating whether the consumer is still running **/
+ private volatile boolean running = true;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Base constructor.
+ *
+ * @param deserializer
+ * The deserializer to turn raw byte messages into Java/Scala objects.
+ */
+ public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T> deserializer) {
+ this.topics = checkNotNull(topics);
+ checkArgument(topics.size() > 0, "You have to define at least one topic.");
+ this.deserializer = checkNotNull(deserializer, "valueDeserializer");
+ }
+
+ /**
+ * This method must be called from the subclasses, to set the list of all subscribed partitions
+ * that this consumer will fetch from (across all subtasks).
+ *
+ * @param allSubscribedPartitions The list of all partitions that all subtasks together should fetch from.
+ */
+ protected void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) {
+ checkNotNull(allSubscribedPartitions);
+ this.subscribedPartitions = Collections.unmodifiableList(allSubscribedPartitions);
+ }
+
+ // ------------------------------------------------------------------------
+ // Configuration
+ // ------------------------------------------------------------------------
+
+ /**
+ * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
+ * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
+ * in the same way as in the Flink runtime, when streams are merged.
+ *
+ * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
+ * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
+ * characteristics are usually lost that way. For example, if the timestamps are strictly ascending
+ * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
+ * parallel source subtask reads more that one partition.
+ *
+ * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
+ * partition, allows users to let them exploit the per-partition characteristics.
+ *
+ * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
+ * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
+ *
+ * @param assigner The timestamp assigner / watermark generator to use.
+ * @return The consumer object, to allow function chaining.
+ */
+ public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
+ checkNotNull(assigner);
+
+ if (this.periodicWatermarkAssigner != null) {
+ throw new IllegalStateException("A periodic watermark emitter has already been set.");
+ }
+ try {
+ ClosureCleaner.clean(assigner, true);
+ this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner);
+ return this;
+ } catch (Exception e) {
+ throw new IllegalArgumentException("The given assigner is not serializable", e);
+ }
+ }
+
+ /**
+ * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
+ * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
+ * in the same way as in the Flink runtime, when streams are merged.
+ *
+ * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
+ * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
+ * characteristics are usually lost that way. For example, if the timestamps are strictly ascending
+ * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
+ * parallel source subtask reads more that one partition.
+ *
+ * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
+ * partition, allows users to let them exploit the per-partition characteristics.
+ *
+ * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
+ * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
+ *
+ * @param assigner The timestamp assigner / watermark generator to use.
+ * @return The consumer object, to allow function chaining.
+ */
+ public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
+ checkNotNull(assigner);
+
+ if (this.punctuatedWatermarkAssigner != null) {
+ throw new IllegalStateException("A punctuated watermark emitter has already been set.");
+ }
+ try {
+ ClosureCleaner.clean(assigner, true);
+ this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
+ return this;
+ } catch (Exception e) {
+ throw new IllegalArgumentException("The given assigner is not serializable", e);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Work methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void run(SourceContext<T> sourceContext) throws Exception {
+ if (subscribedPartitions == null) {
+ throw new Exception("The partitions were not set for the consumer");
+ }
+
+ // we need only do work, if we actually have partitions assigned
+ if (!subscribedPartitions.isEmpty()) {
+
+ // (1) create the fetcher that will communicate with the Kafka brokers
+ final AbstractFetcher<T, ?> fetcher = createFetcher(
+ sourceContext, subscribedPartitions,
+ periodicWatermarkAssigner, punctuatedWatermarkAssigner,
+ (StreamingRuntimeContext) getRuntimeContext());
+
+ // (2) set the fetcher to the restored checkpoint offsets
+ if (restoreToOffset != null) {
+ fetcher.restoreOffsets(restoreToOffset);
+ }
+
+ // publish the reference, for snapshot-, commit-, and cancel calls
+ // IMPORTANT: We can only do that now, because only now will calls to
+ // the fetchers 'snapshotCurrentState()' method return at least
+ // the restored offsets
+ this.kafkaFetcher = fetcher;
+ if (!running) {
+ return;
+ }
+
+ // (3) run the fetcher' main work method
+ fetcher.runFetchLoop();
+ }
+ else {
+ // this source never completes, so emit a Long.MAX_VALUE watermark
+ // to not block watermark forwarding
+ sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
+
+ // wait until this is canceled
+ final Object waitLock = new Object();
+ while (running) {
+ try {
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (waitLock) {
+ waitLock.wait();
+ }
+ }
+ catch (InterruptedException e) {
+ if (!running) {
+ // restore the interrupted state, and fall through the loop
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // set ourselves as not running
+ running = false;
+
+ // abort the fetcher, if there is one
+ if (kafkaFetcher != null) {
+ kafkaFetcher.cancel();
+ }
+
+ // there will be an interrupt() call to the main thread anyways
+ }
+
+ @Override
+ public void open(Configuration configuration) {
+ List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
+
+ if (kafkaTopicPartitions != null) {
+ assignTopicPartitions(kafkaTopicPartitions);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ // pretty much the same logic as cancelling
+ try {
+ cancel();
+ } finally {
+ super.close();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Checkpoint and restore
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+
+ OperatorStateStore stateStore = context.getOperatorStateStore();
+ offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
+
+ if (context.isRestored()) {
+ restoreToOffset = new HashMap<>();
+ for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
+ restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
+ }
+
+ LOG.info("Setting restore state in the FlinkKafkaConsumer.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using the following offsets: {}", restoreToOffset);
+ }
+ } else {
+ LOG.info("No restore state for FlinkKafkaConsumer.");
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ if (!running) {
+ LOG.debug("snapshotState() called on closed source");
+ } else {
+
+ offsetsStateForCheckpoint.clear();
+
+ final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+ if (fetcher == null) {
+ // the fetcher has not yet been initialized, which means we need to return the
+ // originally restored offsets or the assigned partitions
+
+ if (restoreToOffset != null) {
+
+ for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
+ offsetsStateForCheckpoint.add(
+ Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
+ }
+ } else if (subscribedPartitions != null) {
+ for (KafkaTopicPartition subscribedPartition : subscribedPartitions) {
+ offsetsStateForCheckpoint.add(
+ Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET));
+ }
+ }
+
+ // the map cannot be asynchronously updated, because only one checkpoint call can happen
+ // on this function at a time: either snapshotState() or notifyCheckpointComplete()
+ pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset);
+ } else {
+ HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
+
+ // the map cannot be asynchronously updated, because only one checkpoint call can happen
+ // on this function at a time: either snapshotState() or notifyCheckpointComplete()
+ pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+
+ for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
+ offsetsStateForCheckpoint.add(
+ Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
+ }
+ }
+
+ // truncate the map of pending offsets to commit, to prevent infinite growth
+ while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+ pendingOffsetsToCommit.remove(0);
+ }
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ if (!running) {
+ LOG.debug("notifyCheckpointComplete() called on closed source");
+ return;
+ }
+
+ final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+ if (fetcher == null) {
+ LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+ return;
+ }
+
+ // only one commit operation must be in progress
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
+ }
+
+ try {
+ final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+ if (posInMap == -1) {
+ LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+ return;
+ }
+
+ @SuppressWarnings("unchecked")
+ HashMap<KafkaTopicPartition, Long> offsets =
+ (HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
+
+ // remove older checkpoints in map
+ for (int i = 0; i < posInMap; i++) {
+ pendingOffsetsToCommit.remove(0);
+ }
+
+ if (offsets == null || offsets.size() == 0) {
+ LOG.debug("Checkpoint state was empty.");
+ return;
+ }
+ fetcher.commitInternalOffsetsToKafka(offsets);
+ }
+ catch (Exception e) {
+ if (running) {
+ throw e;
+ }
+ // else ignore exception if we are no longer running
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Kafka Consumer specific methods
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the
+ * data, and emits it into the data streams.
+ *
+ * @param sourceContext The source context to emit data to.
+ * @param thisSubtaskPartitions The set of partitions that this subtask should handle.
+ * @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator.
+ * @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator.
+ * @param runtimeContext The task's runtime context.
+ *
+ * @return The instantiated fetcher
+ *
+ * @throws Exception The method should forward exceptions
+ */
+ protected abstract AbstractFetcher<T, ?> createFetcher(
+ SourceContext<T> sourceContext,
+ List<KafkaTopicPartition> thisSubtaskPartitions,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+ StreamingRuntimeContext runtimeContext) throws Exception;
+
+ protected abstract List<KafkaTopicPartition> getKafkaPartitions(List<String> topics);
+
+ // ------------------------------------------------------------------------
+ // ResultTypeQueryable methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializer.getProducedType();
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private void assignTopicPartitions(List<KafkaTopicPartition> kafkaTopicPartitions) {
+ subscribedPartitions = new ArrayList<>();
+
+ if (restoreToOffset != null) {
+ for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
+ if (restoreToOffset.containsKey(kafkaTopicPartition)) {
+ subscribedPartitions.add(kafkaTopicPartition);
+ }
+ }
+ } else {
+ Collections.sort(kafkaTopicPartitions, new Comparator<KafkaTopicPartition>() {
+ @Override
+ public int compare(KafkaTopicPartition o1, KafkaTopicPartition o2) {
+ int topicComparison = o1.getTopic().compareTo(o2.getTopic());
+
+ if (topicComparison == 0) {
+ return o1.getPartition() - o2.getPartition();
+ } else {
+ return topicComparison;
+ }
+ }
+ });
+
+ for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) {
+ subscribedPartitions.add(kafkaTopicPartitions.get(i));
+ }
+ }
+ }
+
+ /**
+ * Selects which of the given partitions should be handled by a specific consumer,
+ * given a certain number of consumers.
+ *
+ * @param allPartitions The partitions to select from
+ * @param numConsumers The number of consumers
+ * @param consumerIndex The index of the specific consumer
+ *
+ * @return The sublist of partitions to be handled by that consumer.
+ */
+ protected static List<KafkaTopicPartition> assignPartitions(
+ List<KafkaTopicPartition> allPartitions,
+ int numConsumers, int consumerIndex) {
+ final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
+ allPartitions.size() / numConsumers + 1);
+
+ for (int i = 0; i < allPartitions.size(); i++) {
+ if (i % numConsumers == consumerIndex) {
+ thisSubtaskPartitions.add(allPartitions.get(i));
+ }
+ }
+
+ return thisSubtaskPartitions;
+ }
+
+ /**
+ * Logs the partition information in INFO level.
+ *
+ * @param logger The logger to log to.
+ * @param partitionInfos List of subscribed partitions
+ */
+ protected static void logPartitionInfo(Logger logger, List<KafkaTopicPartition> partitionInfos) {
+ Map<String, Integer> countPerTopic = new HashMap<>();
+ for (KafkaTopicPartition partition : partitionInfos) {
+ Integer count = countPerTopic.get(partition.getTopic());
+ if (count == null) {
+ count = 1;
+ } else {
+ count++;
+ }
+ countPerTopic.put(partition.getTopic(), count);
+ }
+ StringBuilder sb = new StringBuilder(
+ "Consumer is going to read the following topics (with number of partitions): ");
+
+ for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) {
+ sb.append(e.getKey()).append(" (").append(e.getValue()).append("), ");
+ }
+
+ logger.info(sb.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
new file mode 100644
index 0000000..d413f1c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Collections;
+import java.util.Comparator;
+
+import static java.util.Objects.requireNonNull;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic.
+ *
+ * Please note that this producer provides at-least-once reliability guarantees when
+ * checkpoints are enabled and setFlushOnCheckpoint(true) is set.
+ * Otherwise, the producer doesn't provide any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Configuration key for disabling the metrics reporting
+ */
+ public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
+
+ /**
+ * Array with the partition ids of the given defaultTopicId
+ * The size of this array is the number of partitions
+ */
+ protected int[] partitions;
+
+ /**
+ * User defined properties for the Producer
+ */
+ protected final Properties producerConfig;
+
+ /**
+ * The name of the default topic this producer is writing data to
+ */
+ protected final String defaultTopicId;
+
+ /**
+ * (Serializable) SerializationSchema for turning objects used with Flink into
+ * byte[] for Kafka.
+ */
+ protected final KeyedSerializationSchema<IN> schema;
+
+ /**
+ * User-provided partitioner for assigning an object to a Kafka partition.
+ */
+ protected final KafkaPartitioner<IN> partitioner;
+
+ /**
+ * Flag indicating whether to accept failures (and log them), or to fail on failures
+ */
+ protected boolean logFailuresOnly;
+
+ /**
+ * If true, the producer will wait until all outstanding records have been send to the broker.
+ */
+ protected boolean flushOnCheckpoint;
+
+ // -------------------------------- Runtime fields ------------------------------------------
+
+ /** KafkaProducer instance */
+ protected transient KafkaProducer<byte[], byte[]> producer;
+
+ /** The callback than handles error propagation or logging callbacks */
+ protected transient Callback callback;
+
+ /** Errors encountered in the async producer are stored here */
+ protected transient volatile Exception asyncException;
+
+ /** Lock for accessing the pending records */
+ protected final SerializableObject pendingRecordsLock = new SerializableObject();
+
+ /** Number of unacknowledged records. */
+ protected long pendingRecords;
+
+ protected OperatorStateStore stateStore;
+
+
+ /**
+ * The main constructor for creating a FlinkKafkaProducer.
+ *
+ * @param defaultTopicId The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner
+ */
+ public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ requireNonNull(defaultTopicId, "TopicID not set");
+ requireNonNull(serializationSchema, "serializationSchema not set");
+ requireNonNull(producerConfig, "producerConfig not set");
+ ClosureCleaner.clean(customPartitioner, true);
+ ClosureCleaner.ensureSerializable(serializationSchema);
+
+ this.defaultTopicId = defaultTopicId;
+ this.schema = serializationSchema;
+ this.producerConfig = producerConfig;
+
+ // set the producer configuration properties for kafka record key value serializers.
+ if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+ this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+ } else {
+ LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ }
+
+ if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+ this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+ } else {
+ LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ }
+
+ // eagerly ensure that bootstrap servers are set.
+ if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+ throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
+ }
+
+ this.partitioner = customPartitioner;
+ }
+
+ // ---------------------------------- Properties --------------------------
+
+ /**
+ * Defines whether the producer should fail on errors, or only log them.
+ * If this is set to true, then exceptions will be only logged, if set to false,
+ * exceptions will be eventually thrown and cause the streaming program to
+ * fail (and enter recovery).
+ *
+ * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+ */
+ public void setLogFailuresOnly(boolean logFailuresOnly) {
+ this.logFailuresOnly = logFailuresOnly;
+ }
+
+ /**
+ * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
+ * to be acknowledged by the Kafka producer on a checkpoint.
+ * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
+ *
+ * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
+ */
+ public void setFlushOnCheckpoint(boolean flush) {
+ this.flushOnCheckpoint = flush;
+ }
+
+ /**
+ * Used for testing only
+ */
+ protected <K,V> KafkaProducer<K,V> getKafkaProducer(Properties props) {
+ return new KafkaProducer<>(props);
+ }
+
+ // ----------------------------------- Utilities --------------------------
+
+ /**
+ * Initializes the connection to Kafka.
+ */
+ @Override
+ public void open(Configuration configuration) {
+ producer = getKafkaProducer(this.producerConfig);
+
+ // the fetched list is immutable, so we're creating a mutable copy in order to sort it
+ List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId));
+
+ // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
+ Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
+ @Override
+ public int compare(PartitionInfo o1, PartitionInfo o2) {
+ return Integer.compare(o1.partition(), o2.partition());
+ }
+ });
+
+ partitions = new int[partitionsList.size()];
+ for (int i = 0; i < partitions.length; i++) {
+ partitions[i] = partitionsList.get(i).partition();
+ }
+
+ RuntimeContext ctx = getRuntimeContext();
+ if (partitioner != null) {
+ partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
+ }
+
+ LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}",
+ ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId);
+
+ // register Kafka metrics to Flink accumulators
+ if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
+ Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
+
+ if (metrics == null) {
+ // MapR's Kafka implementation returns null here.
+ LOG.info("Producer implementation does not support metrics");
+ } else {
+ final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
+ for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
+ kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
+ }
+ }
+ }
+
+ if (flushOnCheckpoint && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) {
+ LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
+ flushOnCheckpoint = false;
+ }
+
+ if (logFailuresOnly) {
+ callback = new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception e) {
+ if (e != null) {
+ LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
+ }
+ acknowledgeMessage();
+ }
+ };
+ }
+ else {
+ callback = new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null && asyncException == null) {
+ asyncException = exception;
+ }
+ acknowledgeMessage();
+ }
+ };
+ }
+ }
+
+ /**
+ * Called when new data arrives to the sink, and forwards it to Kafka.
+ *
+ * @param next
+ * The incoming data
+ */
+ @Override
+ public void invoke(IN next) throws Exception {
+ // propagate asynchronous errors
+ checkErroneous();
+
+ byte[] serializedKey = schema.serializeKey(next);
+ byte[] serializedValue = schema.serializeValue(next);
+ String targetTopic = schema.getTargetTopic(next);
+ if (targetTopic == null) {
+ targetTopic = defaultTopicId;
+ }
+
+ ProducerRecord<byte[], byte[]> record;
+ if (partitioner == null) {
+ record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
+ } else {
+ record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);
+ }
+ if (flushOnCheckpoint) {
+ synchronized (pendingRecordsLock) {
+ pendingRecords++;
+ }
+ }
+ producer.send(record, callback);
+ }
+
+
+ @Override
+ public void close() throws Exception {
+ if (producer != null) {
+ producer.close();
+ }
+
+ // make sure we propagate pending errors
+ checkErroneous();
+ }
+
+ // ------------------- Logic for handling checkpoint flushing -------------------------- //
+
+ private void acknowledgeMessage() {
+ if (flushOnCheckpoint) {
+ synchronized (pendingRecordsLock) {
+ pendingRecords--;
+ if (pendingRecords == 0) {
+ pendingRecordsLock.notifyAll();
+ }
+ }
+ }
+ }
+
+ /**
+ * Flush pending records.
+ */
+ protected abstract void flush();
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ this.stateStore = context.getOperatorStateStore();
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
+ if (flushOnCheckpoint) {
+ // flushing is activated: We need to wait until pendingRecords is 0
+ flush();
+ synchronized (pendingRecordsLock) {
+ if (pendingRecords != 0) {
+ throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecords);
+ }
+ // pending records count is 0. We can now confirm the checkpoint
+ }
+ }
+ }
+
+ // ----------------------------------- Utilities --------------------------
+
+ protected void checkErroneous() throws Exception {
+ Exception e = asyncException;
+ if (e != null) {
+ // prevent double throwing
+ asyncException = null;
+ throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
+ }
+ }
+
+ public static Properties getPropertiesFromBrokerList(String brokerList) {
+ String[] elements = brokerList.split(",");
+
+ // validate the broker addresses
+ for (String broker: elements) {
+ NetUtils.getCorrectHostnamePort(broker);
+ }
+
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
new file mode 100644
index 0000000..ee98783
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Base class for {@link KafkaTableSink} that serializes data in JSON format
+ */
+public abstract class KafkaJsonTableSink extends KafkaTableSink {
+
+ /**
+ * Creates KafkaJsonTableSink
+ *
+ * @param topic topic in Kafka to which table is written
+ * @param properties properties to connect to Kafka
+ * @param partitioner Kafka partitioner
+ */
+ public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+ super(topic, properties, partitioner);
+ }
+
+ @Override
+ protected SerializationSchema<Row> createSerializationSchema(String[] fieldNames) {
+ return new JsonRowSerializationSchema(fieldNames);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
new file mode 100644
index 0000000..f145509
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka JSON {@link StreamTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ *
+ * <p>The field names are used to parse the JSON file and so are the types.
+ */
+public abstract class KafkaJsonTableSource extends KafkaTableSource {
+
+ /**
+ * Creates a generic Kafka JSON {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ KafkaJsonTableSource(
+ String topic,
+ Properties properties,
+ String[] fieldNames,
+ Class<?>[] fieldTypes) {
+
+ super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
+ }
+
+ /**
+ * Creates a generic Kafka JSON {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ KafkaJsonTableSource(
+ String topic,
+ Properties properties,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes) {
+
+ super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
+ }
+
+ /**
+ * Configures the failure behaviour if a JSON field is missing.
+ *
+ * <p>By default, a missing field is ignored and the field is set to null.
+ *
+ * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
+ */
+ public void setFailOnMissingField(boolean failOnMissingField) {
+ JsonRowDeserializationSchema deserializationSchema = (JsonRowDeserializationSchema) getDeserializationSchema();
+ deserializationSchema.setFailOnMissingField(failOnMissingField);
+ }
+
+ private static JsonRowDeserializationSchema createDeserializationSchema(
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes) {
+
+ return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
+ }
+
+ private static JsonRowDeserializationSchema createDeserializationSchema(
+ String[] fieldNames,
+ Class<?>[] fieldTypes) {
+
+ return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
new file mode 100644
index 0000000..714d9cd
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sinks.StreamTableSink;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka {@link StreamTableSink}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}.
+ */
+public abstract class KafkaTableSink implements StreamTableSink<Row> {
+
+ protected final String topic;
+ protected final Properties properties;
+ protected SerializationSchema<Row> serializationSchema;
+ protected final KafkaPartitioner<Row> partitioner;
+ protected String[] fieldNames;
+ protected TypeInformation[] fieldTypes;
+
+ /**
+ * Creates KafkaTableSink
+ *
+ * @param topic Kafka topic to write to.
+ * @param properties Properties for the Kafka consumer.
+ * @param partitioner Partitioner to select Kafka partition for each item
+ */
+ public KafkaTableSink(
+ String topic,
+ Properties properties,
+ KafkaPartitioner<Row> partitioner) {
+
+ this.topic = Preconditions.checkNotNull(topic, "topic");
+ this.properties = Preconditions.checkNotNull(properties, "properties");
+ this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
+ }
+
+ /**
+ * Returns the version-specifid Kafka producer.
+ *
+ * @param topic Kafka topic to produce to.
+ * @param properties Properties for the Kafka producer.
+ * @param serializationSchema Serialization schema to use to create Kafka records.
+ * @param partitioner Partitioner to select Kafka partition.
+ * @return The version-specific Kafka producer
+ */
+ protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
+ String topic, Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ KafkaPartitioner<Row> partitioner);
+
+ /**
+ * Create serialization schema for converting table rows into bytes.
+ *
+ * @param fieldNames Field names in table rows.
+ * @return Instance of serialization schema
+ */
+ protected abstract SerializationSchema<Row> createSerializationSchema(String[] fieldNames);
+
+ /**
+ * Create a deep copy of this sink.
+ *
+ * @return Deep copy of this sink
+ */
+ protected abstract KafkaTableSink createCopy();
+
+ @Override
+ public void emitDataStream(DataStream<Row> dataStream) {
+ FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
+ dataStream.addSink(kafkaProducer);
+ }
+
+ @Override
+ public TypeInformation<Row> getOutputType() {
+ return new RowTypeInfo(getFieldTypes());
+ }
+
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ @Override
+ public TypeInformation<?>[] getFieldTypes() {
+ return fieldTypes;
+ }
+
+ @Override
+ public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ KafkaTableSink copy = createCopy();
+ copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
+ copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
+ Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
+ "Number of provided field names and types does not match.");
+ copy.serializationSchema = createSerializationSchema(fieldNames);
+
+ return copy;
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
new file mode 100644
index 0000000..fd423d7
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo;
+
+/**
+ * A version-agnostic Kafka {@link StreamTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ */
+public abstract class KafkaTableSource implements StreamTableSource<Row> {
+
+ /** The Kafka topic to consume. */
+ private final String topic;
+
+ /** Properties for the Kafka consumer. */
+ private final Properties properties;
+
+ /** Deserialization schema to use for Kafka records. */
+ private final DeserializationSchema<Row> deserializationSchema;
+
+ /** Row field names. */
+ private final String[] fieldNames;
+
+ /** Row field types. */
+ private final TypeInformation<?>[] fieldTypes;
+
+ /**
+ * Creates a generic Kafka {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema to use for Kafka records.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ KafkaTableSource(
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ String[] fieldNames,
+ Class<?>[] fieldTypes) {
+
+ this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes));
+ }
+
+ /**
+ * Creates a generic Kafka {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema to use for Kafka records.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ KafkaTableSource(
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes) {
+
+ this.topic = Preconditions.checkNotNull(topic, "Topic");
+ this.properties = Preconditions.checkNotNull(properties, "Properties");
+ this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
+ this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
+ this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
+
+ Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
+ "Number of provided field names and types does not match.");
+ }
+
+ /**
+ * NOTE: This method is for internal use only for defining a TableSource.
+ * Do not use it in Table API programs.
+ */
+ @Override
+ public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
+ // Version-specific Kafka consumer
+ FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema);
+ DataStream<Row> kafkaSource = env.addSource(kafkaConsumer);
+ return kafkaSource;
+ }
+
+ @Override
+ public int getNumberOfFields() {
+ return fieldNames.length;
+ }
+
+ @Override
+ public String[] getFieldsNames() {
+ return fieldNames;
+ }
+
+ @Override
+ public TypeInformation<?>[] getFieldTypes() {
+ return fieldTypes;
+ }
+
+ @Override
+ public TypeInformation<Row> getReturnType() {
+ return new RowTypeInfo(fieldTypes);
+ }
+
+ /**
+ * Returns the version-specific Kafka consumer.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema to use for Kafka records.
+ * @return The version-specific Kafka consumer
+ */
+ abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer(
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema);
+
+ /**
+ * Returns the deserialization schema.
+ *
+ * @return The deserialization schema
+ */
+ protected DeserializationSchema<Row> getDeserializationSchema() {
+ return deserializationSchema;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
new file mode 100644
index 0000000..cf39606
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all fetchers, which implement the connections to Kafka brokers and
+ * pull records from Kafka partitions.
+ *
+ * <p>This fetcher base class implements the logic around emitting records and tracking offsets,
+ * as well as around the optional timestamp assignment and watermark generation.
+ *
+ * @param <T> The type of elements deserialized from Kafka's byte records, and emitted into
+ * the Flink data streams.
+ * @param <KPH> The type of topic/partition identifier used by Kafka in the specific version.
+ */
+public abstract class AbstractFetcher<T, KPH> {
+
+ protected static final int NO_TIMESTAMPS_WATERMARKS = 0;
+ protected static final int PERIODIC_WATERMARKS = 1;
+ protected static final int PUNCTUATED_WATERMARKS = 2;
+
+ // ------------------------------------------------------------------------
+
+ /** The source context to emit records and watermarks to */
+ protected final SourceContext<T> sourceContext;
+
+ /** The lock that guarantees that record emission and state updates are atomic,
+ * from the view of taking a checkpoint */
+ protected final Object checkpointLock;
+
+ /** All partitions (and their state) that this fetcher is subscribed to */
+ private final KafkaTopicPartitionState<KPH>[] allPartitions;
+
+ /** The mode describing whether the fetcher also generates timestamps and watermarks */
+ protected final int timestampWatermarkMode;
+
+ /** Flag whether to register metrics for the fetcher */
+ protected final boolean useMetrics;
+
+ /** Only relevant for punctuated watermarks: The current cross partition watermark */
+ private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
+
+ // ------------------------------------------------------------------------
+
+ protected AbstractFetcher(
+ SourceContext<T> sourceContext,
+ List<KafkaTopicPartition> assignedPartitions,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+ ProcessingTimeService processingTimeProvider,
+ long autoWatermarkInterval,
+ ClassLoader userCodeClassLoader,
+ boolean useMetrics) throws Exception
+ {
+ this.sourceContext = checkNotNull(sourceContext);
+ this.checkpointLock = sourceContext.getCheckpointLock();
+ this.useMetrics = useMetrics;
+
+ // figure out what we watermark mode we will be using
+
+ if (watermarksPeriodic == null) {
+ if (watermarksPunctuated == null) {
+ // simple case, no watermarks involved
+ timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
+ } else {
+ timestampWatermarkMode = PUNCTUATED_WATERMARKS;
+ }
+ } else {
+ if (watermarksPunctuated == null) {
+ timestampWatermarkMode = PERIODIC_WATERMARKS;
+ } else {
+ throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
+ }
+ }
+
+ // create our partition state according to the timestamp/watermark mode
+ this.allPartitions = initializePartitions(
+ assignedPartitions,
+ timestampWatermarkMode,
+ watermarksPeriodic, watermarksPunctuated,
+ userCodeClassLoader);
+
+ // if we have periodic watermarks, kick off the interval scheduler
+ if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+ KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts =
+ (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
+
+ PeriodicWatermarkEmitter periodicEmitter =
+ new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
+ periodicEmitter.start();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets all partitions (with partition state) that this fetcher is subscribed to.
+ *
+ * @return All subscribed partitions.
+ */
+ protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
+ return allPartitions;
+ }
+
+ // ------------------------------------------------------------------------
+ // Core fetcher work methods
+ // ------------------------------------------------------------------------
+
+ public abstract void runFetchLoop() throws Exception;
+
+ public abstract void cancel();
+
+ // ------------------------------------------------------------------------
+ // Kafka version specifics
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates the Kafka version specific representation of the given
+ * topic partition.
+ *
+ * @param partition The Flink representation of the Kafka topic partition.
+ * @return The specific Kafka representation of the Kafka topic partition.
+ */
+ public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition);
+
+ /**
+ * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
+ * older Kafka versions). The given offsets are the internal checkpointed offsets, representing
+ * the last processed record of each partition. Version-specific implementations of this method
+ * need to hold the contract that the given offsets must be incremented by 1 before
+ * committing them, so that committed offsets to Kafka represent "the next record to process".
+ *
+ * @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
+ * @throws Exception This method forwards exceptions.
+ */
+ public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
+
+ // ------------------------------------------------------------------------
+ // snapshot and restore the state
+ // ------------------------------------------------------------------------
+
+ /**
+ * Takes a snapshot of the partition offsets.
+ *
+ * <p>Important: This method mus be called under the checkpoint lock.
+ *
+ * @return A map from partition to current offset.
+ */
+ public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
+ // this method assumes that the checkpoint lock is held
+ assert Thread.holdsLock(checkpointLock);
+
+ HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
+ for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
+ state.put(partition.getKafkaTopicPartition(), partition.getOffset());
+ }
+ return state;
+ }
+
+ /**
+ * Restores the partition offsets.
+ *
+ * @param snapshotState The offsets for the partitions
+ */
+ public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) {
+ for (KafkaTopicPartitionState<?> partition : allPartitions) {
+ Long offset = snapshotState.get(partition.getKafkaTopicPartition());
+ if (offset != null) {
+ partition.setOffset(offset);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // emitting records
+ // ------------------------------------------------------------------------
+
+ /**
+ * Emits a record without attaching an existing timestamp to it.
+ *
+ * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
+ * That makes the fast path efficient, the extended paths are called as separate methods.
+ *
+ * @param record The record to emit
+ * @param partitionState The state of the Kafka partition from which the record was fetched
+ * @param offset The offset of the record
+ */
+ protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception {
+ if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+ // fast path logic, in case there are no watermarks
+
+ // emit the record, using the checkpoint lock to guarantee
+ // atomicity of record emission and offset state update
+ synchronized (checkpointLock) {
+ sourceContext.collect(record);
+ partitionState.setOffset(offset);
+ }
+ }
+ else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+ emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE);
+ }
+ else {
+ emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE);
+ }
+ }
+
+ /**
+ * Emits a record attaching a timestamp to it.
+ *
+ * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
+ * That makes the fast path efficient, the extended paths are called as separate methods.
+ *
+ * @param record The record to emit
+ * @param partitionState The state of the Kafka partition from which the record was fetched
+ * @param offset The offset of the record
+ */
+ protected void emitRecordWithTimestamp(
+ T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
+
+ if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+ // fast path logic, in case there are no watermarks generated in the fetcher
+
+ // emit the record, using the checkpoint lock to guarantee
+ // atomicity of record emission and offset state update
+ synchronized (checkpointLock) {
+ sourceContext.collectWithTimestamp(record, timestamp);
+ partitionState.setOffset(offset);
+ }
+ }
+ else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+ emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
+ }
+ else {
+ emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
+ }
+ }
+
+ /**
+ * Record emission, if a timestamp will be attached from an assigner that is
+ * also a periodic watermark generator.
+ */
+ protected void emitRecordWithTimestampAndPeriodicWatermark(
+ T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp)
+ {
+ @SuppressWarnings("unchecked")
+ final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState =
+ (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
+
+ // extract timestamp - this accesses/modifies the per-partition state inside the
+ // watermark generator instance, so we need to lock the access on the
+ // partition state. concurrent access can happen from the periodic emitter
+ final long timestamp;
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (withWatermarksState) {
+ timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
+ }
+
+ // emit the record with timestamp, using the usual checkpoint lock to guarantee
+ // atomicity of record emission and offset state update
+ synchronized (checkpointLock) {
+ sourceContext.collectWithTimestamp(record, timestamp);
+ partitionState.setOffset(offset);
+ }
+ }
+
+ /**
+ * Record emission, if a timestamp will be attached from an assigner that is
+ * also a punctuated watermark generator.
+ */
+ protected void emitRecordWithTimestampAndPunctuatedWatermark(
+ T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp)
+ {
+ @SuppressWarnings("unchecked")
+ final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
+ (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
+
+ // only one thread ever works on accessing timestamps and watermarks
+ // from the punctuated extractor
+ final long timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
+ final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
+
+ // emit the record with timestamp, using the usual checkpoint lock to guarantee
+ // atomicity of record emission and offset state update
+ synchronized (checkpointLock) {
+ sourceContext.collectWithTimestamp(record, timestamp);
+ partitionState.setOffset(offset);
+ }
+
+ // if we also have a new per-partition watermark, check if that is also a
+ // new cross-partition watermark
+ if (newWatermark != null) {
+ updateMinPunctuatedWatermark(newWatermark);
+ }
+ }
+
+ /**
+ *Checks whether a new per-partition watermark is also a new cross-partition watermark.
+ */
+ private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
+ if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
+ long newMin = Long.MAX_VALUE;
+
+ for (KafkaTopicPartitionState<?> state : allPartitions) {
+ @SuppressWarnings("unchecked")
+ final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
+ (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
+
+ newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
+ }
+
+ // double-check locking pattern
+ if (newMin > maxWatermarkSoFar) {
+ synchronized (checkpointLock) {
+ if (newMin > maxWatermarkSoFar) {
+ maxWatermarkSoFar = newMin;
+ sourceContext.emitWatermark(new Watermark(newMin));
+ }
+ }
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * Utility method that takes the topic partitions and creates the topic partition state
+ * holders. If a watermark generator per partition exists, this will also initialize those.
+ */
+ private KafkaTopicPartitionState<KPH>[] initializePartitions(
+ List<KafkaTopicPartition> assignedPartitions,
+ int timestampWatermarkMode,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+ ClassLoader userCodeClassLoader)
+ throws IOException, ClassNotFoundException
+ {
+ switch (timestampWatermarkMode) {
+
+ case NO_TIMESTAMPS_WATERMARKS: {
+ @SuppressWarnings("unchecked")
+ KafkaTopicPartitionState<KPH>[] partitions =
+ (KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()];
+
+ int pos = 0;
+ for (KafkaTopicPartition partition : assignedPartitions) {
+ // create the kafka version specific partition handle
+ KPH kafkaHandle = createKafkaPartitionHandle(partition);
+ partitions[pos++] = new KafkaTopicPartitionState<>(partition, kafkaHandle);
+ }
+
+ return partitions;
+ }
+
+ case PERIODIC_WATERMARKS: {
+ @SuppressWarnings("unchecked")
+ KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
+ (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
+ new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];
+
+ int pos = 0;
+ for (KafkaTopicPartition partition : assignedPartitions) {
+ KPH kafkaHandle = createKafkaPartitionHandle(partition);
+
+ AssignerWithPeriodicWatermarks<T> assignerInstance =
+ watermarksPeriodic.deserializeValue(userCodeClassLoader);
+
+ partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+ partition, kafkaHandle, assignerInstance);
+ }
+
+ return partitions;
+ }
+
+ case PUNCTUATED_WATERMARKS: {
+ @SuppressWarnings("unchecked")
+ KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
+ (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
+ new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitions.size()];
+
+ int pos = 0;
+ for (KafkaTopicPartition partition : assignedPartitions) {
+ KPH kafkaHandle = createKafkaPartitionHandle(partition);
+
+ AssignerWithPunctuatedWatermarks<T> assignerInstance =
+ watermarksPunctuated.deserializeValue(userCodeClassLoader);
+
+ partitions[pos++] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+ partition, kafkaHandle, assignerInstance);
+ }
+
+ return partitions;
+ }
+ default:
+ // cannot happen, add this as a guard for the future
+ throw new RuntimeException();
+ }
+ }
+
+ // ------------------------- Metrics ----------------------------------
+
+ /**
+ * Add current and committed offsets to metric group
+ *
+ * @param metricGroup The metric group to use
+ */
+ protected void addOffsetStateGauge(MetricGroup metricGroup) {
+ // add current offsets to gage
+ MetricGroup currentOffsets = metricGroup.addGroup("current-offsets");
+ MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets");
+ for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) {
+ currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
+ committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
+ }
+ }
+
+ /**
+ * Gauge types
+ */
+ private enum OffsetGaugeType {
+ CURRENT_OFFSET,
+ COMMITTED_OFFSET
+ }
+
+ /**
+ * Gauge for getting the offset of a KafkaTopicPartitionState.
+ */
+ private static class OffsetGauge implements Gauge<Long> {
+
+ private final KafkaTopicPartitionState<?> ktp;
+ private final OffsetGaugeType gaugeType;
+
+ public OffsetGauge(KafkaTopicPartitionState<?> ktp, OffsetGaugeType gaugeType) {
+ this.ktp = ktp;
+ this.gaugeType = gaugeType;
+ }
+
+ @Override
+ public Long getValue() {
+ switch(gaugeType) {
+ case COMMITTED_OFFSET:
+ return ktp.getCommittedOffset();
+ case CURRENT_OFFSET:
+ return ktp.getOffset();
+ default:
+ throw new RuntimeException("Unknown gauge type: " + gaugeType);
+ }
+ }
+ }
+ // ------------------------------------------------------------------------
+
+ /**
+ * The periodic watermark emitter. In its given interval, it checks all partitions for
+ * the current event time watermark, and possibly emits the next watermark.
+ */
+ private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback {
+
+ private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
+
+ private final SourceContext<?> emitter;
+
+ private final ProcessingTimeService timerService;
+
+ private final long interval;
+
+ private long lastWatermarkTimestamp;
+
+ //-------------------------------------------------
+
+ PeriodicWatermarkEmitter(
+ KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
+ SourceContext<?> emitter,
+ ProcessingTimeService timerService,
+ long autoWatermarkInterval)
+ {
+ this.allPartitions = checkNotNull(allPartitions);
+ this.emitter = checkNotNull(emitter);
+ this.timerService = checkNotNull(timerService);
+ this.interval = autoWatermarkInterval;
+ this.lastWatermarkTimestamp = Long.MIN_VALUE;
+ }
+
+ //-------------------------------------------------
+
+ public void start() {
+ timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
+ }
+
+ @Override
+ public void onProcessingTime(long timestamp) throws Exception {
+
+ long minAcrossAll = Long.MAX_VALUE;
+ for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) {
+
+ // we access the current watermark for the periodic assigners under the state
+ // lock, to prevent concurrent modification to any internal variables
+ final long curr;
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (state) {
+ curr = state.getCurrentWatermarkTimestamp();
+ }
+
+ minAcrossAll = Math.min(minAcrossAll, curr);
+ }
+
+ // emit next watermark, if there is one
+ if (minAcrossAll > lastWatermarkTimestamp) {
+ lastWatermarkTimestamp = minAcrossAll;
+ emitter.emitWatermark(new Watermark(minAcrossAll));
+ }
+
+ // schedule the next watermark
+ timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
new file mode 100644
index 0000000..c736493
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A proxy that communicates exceptions between threads. Typically used if an exception
+ * from a spawned thread needs to be recognized by the "parent" (spawner) thread.
+ *
+ * <p>The spawned thread would set the exception via {@link #reportError(Throwable)}.
+ * The parent would check (at certain points) for exceptions via {@link #checkAndThrowException()}.
+ * Optionally, the parent can pass itself in the constructor to be interrupted as soon as
+ * an exception occurs.
+ *
+ * <pre>
+ * {@code
+ *
+ * final ExceptionProxy errorProxy = new ExceptionProxy(Thread.currentThread());
+ *
+ * Thread subThread = new Thread() {
+ *
+ * public void run() {
+ * try {
+ * doSomething();
+ * } catch (Throwable t) {
+ * errorProxy.reportError(
+ * } finally {
+ * doSomeCleanup();
+ * }
+ * }
+ * };
+ * subThread.start();
+ *
+ * doSomethingElse();
+ * errorProxy.checkAndThrowException();
+ *
+ * doSomethingMore();
+ * errorProxy.checkAndThrowException();
+ *
+ * try {
+ * subThread.join();
+ * } catch (InterruptedException e) {
+ * errorProxy.checkAndThrowException();
+ * // restore interrupted status, if not caused by an exception
+ * Thread.currentThread().interrupt();
+ * }
+ * }
+ * </pre>
+ */
+public class ExceptionProxy {
+
+ /** The thread that should be interrupted when an exception occurs */
+ private final Thread toInterrupt;
+
+ /** The exception to throw */
+ private final AtomicReference<Throwable> exception;
+
+ /**
+ * Creates an exception proxy that interrupts the given thread upon
+ * report of an exception. The thread to interrupt may be null.
+ *
+ * @param toInterrupt The thread to interrupt upon an exception. May be null.
+ */
+ public ExceptionProxy(@Nullable Thread toInterrupt) {
+ this.toInterrupt = toInterrupt;
+ this.exception = new AtomicReference<>();
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Sets the exception and interrupts the target thread,
+ * if no other exception has occurred so far.
+ *
+ * <p>The exception is only set (and the interruption is only triggered),
+ * if no other exception was set before.
+ *
+ * @param t The exception that occurred
+ */
+ public void reportError(Throwable t) {
+ // set the exception, if it is the first (and the exception is non null)
+ if (t != null && exception.compareAndSet(null, t) && toInterrupt != null) {
+ toInterrupt.interrupt();
+ }
+ }
+
+ /**
+ * Checks whether an exception has been set via {@link #reportError(Throwable)}.
+ * If yes, that exception if re-thrown by this method.
+ *
+ * @throws Exception This method re-throws the exception, if set.
+ */
+ public void checkAndThrowException() throws Exception {
+ Throwable t = exception.get();
+ if (t != null) {
+ if (t instanceof Exception) {
+ throw (Exception) t;
+ }
+ else if (t instanceof Error) {
+ throw (Error) t;
+ }
+ else {
+ throw new Exception(t);
+ }
+ }
+ }
+}