You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:26 UTC
[36/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
new file mode 100644
index 0000000..d015157
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.common.TopicAndPartition;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.kafka.common.Node;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A fetcher that fetches data from Kafka brokers via the Kafka 0.8 low-level consumer API.
+ * The fetcher also handles the explicit communication with ZooKeeper to fetch initial offsets
+ * and to write offsets to ZooKeeper.
+ *
+ * @param <T> The type of elements produced by the fetcher.
+ */
+public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
+
+ static final KafkaTopicPartitionState<TopicAndPartition> MARKER =
+ new KafkaTopicPartitionState<>(new KafkaTopicPartition("n/a", -1), new TopicAndPartition("n/a", -1));
+
+ private static final Logger LOG = LoggerFactory.getLogger(Kafka08Fetcher.class);
+
+ // ------------------------------------------------------------------------
+
+ /** The schema to convert between Kafka's byte messages, and Flink's objects */
+ private final KeyedDeserializationSchema<T> deserializer;
+
+ /** The properties that configure the Kafka connection */
+ private final Properties kafkaConfig;
+
+ /** The subtask's runtime context */
+ private final RuntimeContext runtimeContext;
+
+ /** The queue of partitions that are currently not assigned to a broker connection */
+ private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsQueue;
+
+ /** The behavior to use in case that an offset is not valid (any more) for a partition */
+ private final long invalidOffsetBehavior;
+
+ /** The interval in which to automatically commit (-1 if deactivated) */
+ private final long autoCommitInterval;
+
+ /** The handler that reads/writes offsets from/to ZooKeeper */
+ private volatile ZookeeperOffsetHandler zookeeperOffsetHandler;
+
+ /** Flag to track the main work loop as alive */
+ private volatile boolean running = true;
+
+
+ public Kafka08Fetcher(
+ SourceContext<T> sourceContext,
+ List<KafkaTopicPartition> assignedPartitions,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+ StreamingRuntimeContext runtimeContext,
+ KeyedDeserializationSchema<T> deserializer,
+ Properties kafkaProperties,
+ long invalidOffsetBehavior,
+ long autoCommitInterval,
+ boolean useMetrics) throws Exception
+ {
+ super(
+ sourceContext,
+ assignedPartitions,
+ watermarksPeriodic,
+ watermarksPunctuated,
+ runtimeContext.getProcessingTimeService(),
+ runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+ runtimeContext.getUserCodeClassLoader(),
+ useMetrics);
+
+ this.deserializer = checkNotNull(deserializer);
+ this.kafkaConfig = checkNotNull(kafkaProperties);
+ this.runtimeContext = runtimeContext;
+ this.invalidOffsetBehavior = invalidOffsetBehavior;
+ this.autoCommitInterval = autoCommitInterval;
+ this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
+
+ // initially, all these partitions are not assigned to a specific broker connection
+ for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
+ unassignedPartitionsQueue.add(partition);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Main Work Loop
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void runFetchLoop() throws Exception {
+ // the map from broker to the thread that is connected to that broker
+ final Map<Node, SimpleConsumerThread<T>> brokerToThread = new HashMap<>();
+
+ // this holds possible the exceptions from the concurrent broker connection threads
+ final ExceptionProxy errorHandler = new ExceptionProxy(Thread.currentThread());
+
+ // the offset handler handles the communication with ZooKeeper, to commit externally visible offsets
+ final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig);
+ this.zookeeperOffsetHandler = zookeeperOffsetHandler;
+
+ PeriodicOffsetCommitter periodicCommitter = null;
+ try {
+ // read offsets from ZooKeeper for partitions that did not restore offsets
+ {
+ List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
+ for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
+ if (!partition.isOffsetDefined()) {
+ partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
+ }
+ }
+
+ Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
+ for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
+ Long zkOffset = zkOffsets.get(partition.getKafkaTopicPartition());
+ if (zkOffset != null) {
+ // the offset in ZK represents the "next record to process", so we need to subtract it by 1
+ // to correctly represent our internally checkpointed offsets
+ partition.setOffset(zkOffset - 1);
+ }
+ }
+ }
+
+ // start the periodic offset committer thread, if necessary
+ if (autoCommitInterval > 0) {
+ LOG.info("Starting periodic offset committer, with commit interval of {}ms", autoCommitInterval);
+
+ periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler,
+ subscribedPartitions(), errorHandler, autoCommitInterval);
+ periodicCommitter.setName("Periodic Kafka partition offset committer");
+ periodicCommitter.setDaemon(true);
+ periodicCommitter.start();
+ }
+
+ // register offset metrics
+ if (useMetrics) {
+ final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
+ addOffsetStateGauge(kafkaMetricGroup);
+ }
+
+ // Main loop polling elements from the unassignedPartitions queue to the threads
+ while (running) {
+ // re-throw any exception from the concurrent fetcher threads
+ errorHandler.checkAndThrowException();
+
+ // wait for max 5 seconds trying to get partitions to assign
+ // if threads shut down, this poll returns earlier, because the threads inject the
+ // special marker into the queue
+ List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign =
+ unassignedPartitionsQueue.getBatchBlocking(5000);
+ partitionsToAssign.remove(MARKER);
+
+ if (!partitionsToAssign.isEmpty()) {
+ LOG.info("Assigning {} partitions to broker threads", partitionsToAssign.size());
+ Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders =
+ findLeaderForPartitions(partitionsToAssign, kafkaConfig);
+
+ // assign the partitions to the leaders (maybe start the threads)
+ for (Map.Entry<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeader :
+ partitionsWithLeaders.entrySet())
+ {
+ final Node leader = partitionsWithLeader.getKey();
+ final List<KafkaTopicPartitionState<TopicAndPartition>> partitions = partitionsWithLeader.getValue();
+ SimpleConsumerThread<T> brokerThread = brokerToThread.get(leader);
+
+ if (!running) {
+ break;
+ }
+
+ if (brokerThread == null || !brokerThread.getNewPartitionsQueue().isOpen()) {
+ // start new thread
+ brokerThread = createAndStartSimpleConsumerThread(partitions, leader, errorHandler);
+ brokerToThread.put(leader, brokerThread);
+ }
+ else {
+ // put elements into queue of thread
+ ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue =
+ brokerThread.getNewPartitionsQueue();
+
+ for (KafkaTopicPartitionState<TopicAndPartition> fp : partitions) {
+ if (!newPartitionsQueue.addIfOpen(fp)) {
+ // we were unable to add the partition to the broker's queue
+ // the broker has closed in the meantime (the thread will shut down)
+ // create a new thread for connecting to this broker
+ List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new ArrayList<>();
+ seedPartitions.add(fp);
+ brokerThread = createAndStartSimpleConsumerThread(seedPartitions, leader, errorHandler);
+ brokerToThread.put(leader, brokerThread);
+ newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for the subsequent partitions
+ }
+ }
+ }
+ }
+ }
+ else {
+ // there were no partitions to assign. Check if any broker threads shut down.
+ // we get into this section of the code, if either the poll timed out, or the
+ // blocking poll was woken up by the marker element
+ Iterator<SimpleConsumerThread<T>> bttIterator = brokerToThread.values().iterator();
+ while (bttIterator.hasNext()) {
+ SimpleConsumerThread<T> thread = bttIterator.next();
+ if (!thread.getNewPartitionsQueue().isOpen()) {
+ LOG.info("Removing stopped consumer thread {}", thread.getName());
+ bttIterator.remove();
+ }
+ }
+ }
+
+ if (brokerToThread.size() == 0 && unassignedPartitionsQueue.isEmpty()) {
+ if (unassignedPartitionsQueue.close()) {
+ LOG.info("All consumer threads are finished, there are no more unassigned partitions. Stopping fetcher");
+ break;
+ }
+ // we end up here if somebody added something to the queue in the meantime --> continue to poll queue again
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ // this may be thrown because an exception on one of the concurrent fetcher threads
+ // woke this thread up. make sure we throw the root exception instead in that case
+ errorHandler.checkAndThrowException();
+
+ // no other root exception, throw the interrupted exception
+ throw e;
+ }
+ finally {
+ this.running = false;
+ this.zookeeperOffsetHandler = null;
+
+ // if we run a periodic committer thread, shut that down
+ if (periodicCommitter != null) {
+ periodicCommitter.shutdown();
+ }
+
+ // clear the interruption flag
+ // this allows the joining on consumer threads (on best effort) to happen in
+ // case the initial interrupt already
+ Thread.interrupted();
+
+ // make sure that in any case (completion, abort, error), all spawned threads are stopped
+ try {
+ int runningThreads;
+ do {
+ // check whether threads are alive and cancel them
+ runningThreads = 0;
+ Iterator<SimpleConsumerThread<T>> threads = brokerToThread.values().iterator();
+ while (threads.hasNext()) {
+ SimpleConsumerThread<?> t = threads.next();
+ if (t.isAlive()) {
+ t.cancel();
+ runningThreads++;
+ } else {
+ threads.remove();
+ }
+ }
+
+ // wait for the threads to finish, before issuing a cancel call again
+ if (runningThreads > 0) {
+ for (SimpleConsumerThread<?> t : brokerToThread.values()) {
+ t.join(500 / runningThreads + 1);
+ }
+ }
+ }
+ while (runningThreads > 0);
+ }
+ catch (InterruptedException ignored) {
+ // waiting for the thread shutdown apparently got interrupted
+ // restore interrupted state and continue
+ Thread.currentThread().interrupt();
+ }
+ catch (Throwable t) {
+ // we catch all here to preserve the original exception
+ LOG.error("Exception while shutting down consumer threads", t);
+ }
+
+ try {
+ zookeeperOffsetHandler.close();
+ }
+ catch (Throwable t) {
+ // we catch all here to preserve the original exception
+ LOG.error("Exception while shutting down ZookeeperOffsetHandler", t);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // signal the main thread to exit
+ this.running = false;
+
+ // make sure the main thread wakes up soon
+ this.unassignedPartitionsQueue.addIfOpen(MARKER);
+ }
+
+ // ------------------------------------------------------------------------
+ // Kafka 0.8 specific class instantiation
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
+ return new TopicAndPartition(partition.getTopic(), partition.getPartition());
+ }
+
+ // ------------------------------------------------------------------------
+ // Offset handling
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+ ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
+ if (zkHandler != null) {
+ // the ZK handler takes care of incrementing the offsets by 1 before committing
+ zkHandler.prepareAndCommitOffsets(offsets);
+ }
+
+ // Set committed offsets in topic partition state
+ KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitions();
+ for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
+ Long offset = offsets.get(partition.getKafkaTopicPartition());
+ if (offset != null) {
+ partition.setCommittedOffset(offset);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private SimpleConsumerThread<T> createAndStartSimpleConsumerThread(
+ List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions,
+ Node leader,
+ ExceptionProxy errorHandler) throws IOException, ClassNotFoundException
+ {
+ // each thread needs its own copy of the deserializer, because the deserializer is
+ // not necessarily thread safe
+ final KeyedDeserializationSchema<T> clonedDeserializer =
+ InstantiationUtil.clone(deserializer, runtimeContext.getUserCodeClassLoader());
+
+ // seed thread with list of fetch partitions (otherwise it would shut down immediately again
+ SimpleConsumerThread<T> brokerThread = new SimpleConsumerThread<>(
+ this, errorHandler, kafkaConfig, leader, seedPartitions, unassignedPartitionsQueue,
+ clonedDeserializer, invalidOffsetBehavior);
+
+ brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
+ runtimeContext.getTaskName(), leader.id(), leader.host(), leader.port()));
+ brokerThread.setDaemon(true);
+ brokerThread.start();
+
+ LOG.info("Starting thread {}", brokerThread.getName());
+ return brokerThread;
+ }
+
+ /**
+ * Returns a list of unique topics from for the given partitions
+ *
+ * @param partitions A the partitions
+ * @return A list of unique topics
+ */
+ private static List<String> getTopics(List<KafkaTopicPartitionState<TopicAndPartition>> partitions) {
+ HashSet<String> uniqueTopics = new HashSet<>();
+ for (KafkaTopicPartitionState<TopicAndPartition> fp: partitions) {
+ uniqueTopics.add(fp.getTopic());
+ }
+ return new ArrayList<>(uniqueTopics);
+ }
+
+ /**
+ * Find leaders for the partitions
+ *
+ * From a high level, the method does the following:
+ * - Get a list of FetchPartitions (usually only a few partitions)
+ * - Get the list of topics from the FetchPartitions list and request the partitions for the topics. (Kafka doesn't support getting leaders for a set of partitions)
+ * - Build a Map<Leader, List<FetchPartition>> where only the requested partitions are contained.
+ *
+ * @param partitionsToAssign fetch partitions list
+ * @return leader to partitions map
+ */
+ private static Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> findLeaderForPartitions(
+ List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign,
+ Properties kafkaProperties) throws Exception
+ {
+ if (partitionsToAssign.isEmpty()) {
+ throw new IllegalArgumentException("Leader request for empty partitions list");
+ }
+
+ LOG.info("Refreshing leader information for partitions {}", partitionsToAssign);
+
+ // this request is based on the topic names
+ PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(getTopics(partitionsToAssign), kafkaProperties);
+ infoFetcher.start();
+
+ // NOTE: The kafka client apparently locks itself up sometimes
+ // when it is interrupted, so we run it only in a separate thread.
+ // since it sometimes refuses to shut down, we resort to the admittedly harsh
+ // means of killing the thread after a timeout.
+ KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
+ watchDog.start();
+
+ // this list contains ALL partitions of the requested topics
+ List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions();
+
+ // copy list to track unassigned partitions
+ List<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions = new ArrayList<>(partitionsToAssign);
+
+ // final mapping from leader -> list(fetchPartition)
+ Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> leaderToPartitions = new HashMap<>();
+
+ for(KafkaTopicPartitionLeader partitionLeader: topicPartitionWithLeaderList) {
+ if (unassignedPartitions.size() == 0) {
+ // we are done: all partitions are assigned
+ break;
+ }
+
+ Iterator<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsIterator = unassignedPartitions.iterator();
+ while (unassignedPartitionsIterator.hasNext()) {
+ KafkaTopicPartitionState<TopicAndPartition> unassignedPartition = unassignedPartitionsIterator.next();
+
+ if (unassignedPartition.getKafkaTopicPartition().equals(partitionLeader.getTopicPartition())) {
+ // we found the leader for one of the fetch partitions
+ Node leader = partitionLeader.getLeader();
+
+ List<KafkaTopicPartitionState<TopicAndPartition>> partitionsOfLeader = leaderToPartitions.get(leader);
+ if (partitionsOfLeader == null) {
+ partitionsOfLeader = new ArrayList<>();
+ leaderToPartitions.put(leader, partitionsOfLeader);
+ }
+ partitionsOfLeader.add(unassignedPartition);
+ unassignedPartitionsIterator.remove(); // partition has been assigned
+ break;
+ }
+ }
+ }
+
+ if (unassignedPartitions.size() > 0) {
+ throw new RuntimeException("Unable to find a leader for partitions: " + unassignedPartitions);
+ }
+
+ LOG.debug("Partitions with assigned leaders {}", leaderToPartitions);
+
+ return leaderToPartitions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
new file mode 100644
index 0000000..4d61e53
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * A watch dog thread that forcibly kills another thread, if that thread does not
+ * finish in time.
+ *
+ * <p>This uses the discouraged {@link Thread#stop()} method. While this is not
+ * advisable, this watch dog is only for extreme cases of thread that simply
+ * to not terminate otherwise.
+ */
+class KillerWatchDog extends Thread {
+
+ private final Thread toKill;
+ private final long timeout;
+
+ KillerWatchDog(Thread toKill, long timeout) {
+ super("KillerWatchDog");
+ setDaemon(true);
+
+ this.toKill = toKill;
+ this.timeout = timeout;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void run() {
+ final long deadline = System.currentTimeMillis() + timeout;
+ long now;
+
+ while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
+ try {
+ toKill.join(deadline - now);
+ }
+ catch (InterruptedException e) {
+ // ignore here, our job is important!
+ }
+ }
+
+ // this is harsh, but this watchdog is a last resort
+ if (toKill.isAlive()) {
+ toKill.stop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
new file mode 100644
index 0000000..d8d927d
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
+
+import java.util.List;
+import java.util.Properties;
+
+class PartitionInfoFetcher extends Thread {
+
+ private final List<String> topics;
+ private final Properties properties;
+
+ private volatile List<KafkaTopicPartitionLeader> result;
+ private volatile Throwable error;
+
+
+ PartitionInfoFetcher(List<String> topics, Properties properties) {
+ this.topics = topics;
+ this.properties = properties;
+ }
+
+ @Override
+ public void run() {
+ try {
+ result = FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties);
+ }
+ catch (Throwable t) {
+ this.error = t;
+ }
+ }
+
+ public List<KafkaTopicPartitionLeader> getPartitions() throws Exception {
+ try {
+ this.join();
+ }
+ catch (InterruptedException e) {
+ throw new Exception("Partition fetching was cancelled before completion");
+ }
+
+ if (error != null) {
+ throw new Exception("Failed to fetch partitions for topics " + topics.toString(), error);
+ }
+ if (result != null) {
+ return result;
+ }
+ throw new Exception("Partition fetching failed");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
new file mode 100644
index 0000000..27d90f2
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.HashMap;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A thread that periodically writes the current Kafka partition offsets to Zookeeper.
+ */
+public class PeriodicOffsetCommitter extends Thread {
+
+ /** The ZooKeeper handler */
+ private final ZookeeperOffsetHandler offsetHandler;
+
+ private final KafkaTopicPartitionState<?>[] partitionStates;
+
+ /** The proxy to forward exceptions to the main thread */
+ private final ExceptionProxy errorHandler;
+
+ /** Interval in which to commit, in milliseconds */
+ private final long commitInterval;
+
+ /** Flag to mark the periodic committer as running */
+ private volatile boolean running = true;
+
+ PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler,
+ KafkaTopicPartitionState<?>[] partitionStates,
+ ExceptionProxy errorHandler,
+ long commitInterval)
+ {
+ this.offsetHandler = checkNotNull(offsetHandler);
+ this.partitionStates = checkNotNull(partitionStates);
+ this.errorHandler = checkNotNull(errorHandler);
+ this.commitInterval = commitInterval;
+
+ checkArgument(commitInterval > 0);
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (running) {
+ Thread.sleep(commitInterval);
+
+ // create copy a deep copy of the current offsets
+ HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.length);
+ for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
+ offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
+ }
+
+ offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
+ }
+ }
+ catch (Throwable t) {
+ if (running) {
+ errorHandler.reportError(
+ new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
+ }
+ }
+ }
+
+ public void shutdown() {
+ this.running = false;
+ this.interrupt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
new file mode 100644
index 0000000..35e491a
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.ErrorMapping;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
+
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.kafka.common.Node;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.PropertiesUtil.getInt;
+
+/**
+ * This class implements a thread with a connection to a single Kafka broker. The thread
+ * pulls records for a set of topic partitions for which the connected broker is currently
+ * the leader. The thread deserializes these records and emits them.
+ *
+ * @param <T> The type of elements that this consumer thread creates from Kafka's byte messages
+ * and emits into the Flink DataStream.
+ */
+class SimpleConsumerThread<T> extends Thread {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleConsumerThread.class);
+
+ private static final KafkaTopicPartitionState<TopicAndPartition> MARKER = Kafka08Fetcher.MARKER;
+
+ // ------------------------------------------------------------------------
+
+ private final Kafka08Fetcher<T> owner;
+
+ private final KeyedDeserializationSchema<T> deserializer;
+
+ private final List<KafkaTopicPartitionState<TopicAndPartition>> partitions;
+
+ private final Node broker;
+
+ /** Queue containing new fetch partitions for the consumer thread */
+ private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue;
+
+ private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions;
+
+ private final ExceptionProxy errorHandler;
+
+ private final long invalidOffsetBehavior;
+
+ private volatile boolean running = true;
+
+
+ // ----------------- Simple Consumer ----------------------
+ private volatile SimpleConsumer consumer;
+
+ private final int soTimeout;
+ private final int minBytes;
+ private final int maxWait;
+ private final int fetchSize;
+ private final int bufferSize;
+ private final int reconnectLimit;
+
+
+ // exceptions are thrown locally
+ public SimpleConsumerThread(
+ Kafka08Fetcher<T> owner,
+ ExceptionProxy errorHandler,
+ Properties config,
+ Node broker,
+ List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions,
+ ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions,
+ KeyedDeserializationSchema<T> deserializer,
+ long invalidOffsetBehavior)
+ {
+ this.owner = owner;
+ this.errorHandler = errorHandler;
+ this.broker = broker;
+ this.partitions = seedPartitions;
+ this.deserializer = requireNonNull(deserializer);
+ this.unassignedPartitions = requireNonNull(unassignedPartitions);
+ this.newPartitionsQueue = new ClosableBlockingQueue<>();
+ this.invalidOffsetBehavior = invalidOffsetBehavior;
+
+ // these are the actual configuration values of Kafka + their original default values.
+ this.soTimeout = getInt(config, "socket.timeout.ms", 30000);
+ this.minBytes = getInt(config, "fetch.min.bytes", 1);
+ this.maxWait = getInt(config, "fetch.wait.max.ms", 100);
+ this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576);
+ this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536);
+ this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3);
+ }
+
+ public ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> getNewPartitionsQueue() {
+ return newPartitionsQueue;
+ }
+
+ // ------------------------------------------------------------------------
+ // main work loop
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void run() {
+ LOG.info("Starting to fetch from {}", this.partitions);
+
+ // set up the config values
+ final String clientId = "flink-kafka-consumer-legacy-" + broker.id();
+
+ try {
+ // create the Kafka consumer that we actually use for fetching
+ consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
+
+ // make sure that all partitions have some offsets to start with
+ // those partitions that do not have an offset from a checkpoint need to get
+ // their start offset from ZooKeeper
+ getMissingOffsetsFromKafka(partitions);
+
+ // Now, the actual work starts :-)
+ int offsetOutOfRangeCount = 0;
+ int reconnects = 0;
+ while (running) {
+
+ // ----------------------------------- partitions list maintenance ----------------------------
+
+ // check queue for new partitions to read from:
+ List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = newPartitionsQueue.pollBatch();
+ if (newPartitions != null) {
+ // found some new partitions for this thread's broker
+
+ // check if the new partitions need an offset lookup
+ getMissingOffsetsFromKafka(newPartitions);
+
+ // add the new partitions (and check they are not already in there)
+ for (KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) {
+ if (partitions.contains(newPartition)) {
+ throw new IllegalStateException("Adding partition " + newPartition +
+ " to subscribed partitions even though it is already subscribed");
+ }
+ partitions.add(newPartition);
+ }
+
+ LOG.info("Adding {} new partitions to consumer thread {}", newPartitions.size(), getName());
+ LOG.debug("Partitions list: {}", newPartitions);
+ }
+
+ if (partitions.size() == 0) {
+ if (newPartitionsQueue.close()) {
+ // close succeeded. Closing thread
+ running = false;
+
+ LOG.info("Consumer thread {} does not have any partitions assigned anymore. Stopping thread.",
+ getName());
+
+ // add the wake-up marker into the queue to make the main thread
+ // immediately wake up and termination faster
+ unassignedPartitions.add(MARKER);
+
+ break;
+ } else {
+ // close failed: fetcher main thread concurrently added new partitions into the queue.
+ // go to top of loop again and get the new partitions
+ continue;
+ }
+ }
+
+ // ----------------------------------- request / response with kafka ----------------------------
+
+ FetchRequestBuilder frb = new FetchRequestBuilder();
+ frb.clientId(clientId);
+ frb.maxWait(maxWait);
+ frb.minBytes(minBytes);
+
+ for (KafkaTopicPartitionState<?> partition : partitions) {
+ frb.addFetch(
+ partition.getKafkaTopicPartition().getTopic(),
+ partition.getKafkaTopicPartition().getPartition(),
+ partition.getOffset() + 1, // request the next record
+ fetchSize);
+ }
+
+ kafka.api.FetchRequest fetchRequest = frb.build();
+ LOG.debug("Issuing fetch request {}", fetchRequest);
+
+ FetchResponse fetchResponse;
+ try {
+ fetchResponse = consumer.fetch(fetchRequest);
+ }
+ catch (Throwable cce) {
+ //noinspection ConstantConditions
+ if (cce instanceof ClosedChannelException) {
+ LOG.warn("Fetch failed because of ClosedChannelException.");
+ LOG.debug("Full exception", cce);
+
+ // we don't know if the broker is overloaded or unavailable.
+ // retry a few times, then return ALL partitions for new leader lookup
+ if (++reconnects >= reconnectLimit) {
+ LOG.warn("Unable to reach broker after {} retries. Returning all current partitions", reconnectLimit);
+ for (KafkaTopicPartitionState<TopicAndPartition> fp: this.partitions) {
+ unassignedPartitions.add(fp);
+ }
+ this.partitions.clear();
+ continue; // jump to top of loop: will close thread or subscribe to new partitions
+ }
+ try {
+ consumer.close();
+ } catch (Throwable t) {
+ LOG.warn("Error while closing consumer connection", t);
+ }
+ // delay & retry
+ Thread.sleep(100);
+ consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
+ continue; // retry
+ } else {
+ throw cce;
+ }
+ }
+ reconnects = 0;
+
+ // ---------------------------------------- error handling ----------------------------
+
+ if (fetchResponse == null) {
+ throw new IOException("Fetch from Kafka failed (request returned null)");
+ }
+
+ if (fetchResponse.hasError()) {
+ String exception = "";
+ List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
+
+ // iterate over partitions to get individual error codes
+ Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
+ boolean partitionsRemoved = false;
+
+ while (partitionsIterator.hasNext()) {
+ final KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next();
+ short code = fetchResponse.errorCode(fp.getTopic(), fp.getPartition());
+
+ if (code == ErrorMapping.OffsetOutOfRangeCode()) {
+ // we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
+ // Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
+ partitionsToGetOffsetsFor.add(fp);
+ }
+ else if (code == ErrorMapping.NotLeaderForPartitionCode() ||
+ code == ErrorMapping.LeaderNotAvailableCode() ||
+ code == ErrorMapping.BrokerNotAvailableCode() ||
+ code == ErrorMapping.UnknownCode())
+ {
+ // the broker we are connected to is not the leader for the partition.
+ LOG.warn("{} is not the leader of {}. Reassigning leader for partition", broker, fp);
+ LOG.debug("Error code = {}", code);
+
+ unassignedPartitions.add(fp);
+
+ partitionsIterator.remove(); // unsubscribe the partition ourselves
+ partitionsRemoved = true;
+ }
+ else if (code != ErrorMapping.NoError()) {
+ exception += "\nException for " + fp.getTopic() +":"+ fp.getPartition() + ": " +
+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+ }
+ }
+ if (partitionsToGetOffsetsFor.size() > 0) {
+ // safeguard against an infinite loop.
+ if (offsetOutOfRangeCount++ > 3) {
+ throw new RuntimeException("Found invalid offsets more than three times in partitions "
+ + partitionsToGetOffsetsFor + " Exceptions: " + exception);
+ }
+ // get valid offsets for these partitions and try again.
+ LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
+ getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior);
+
+ LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
+ continue; // jump back to create a new fetch request. The offset has not been touched.
+ }
+ else if (partitionsRemoved) {
+ continue; // create new fetch request
+ }
+ else {
+ // partitions failed on an error
+ throw new IOException("Error while fetching from broker '" + broker +"': " + exception);
+ }
+ } else {
+ // successful fetch, reset offsetOutOfRangeCount.
+ offsetOutOfRangeCount = 0;
+ }
+
+ // ----------------------------------- process fetch response ----------------------------
+
+ int messagesInFetch = 0;
+ int deletedMessages = 0;
+ Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
+
+ partitionsLoop:
+ while (partitionsIterator.hasNext()) {
+ final KafkaTopicPartitionState<TopicAndPartition> currentPartition = partitionsIterator.next();
+
+ final ByteBufferMessageSet messageSet = fetchResponse.messageSet(
+ currentPartition.getTopic(), currentPartition.getPartition());
+
+ for (MessageAndOffset msg : messageSet) {
+ if (running) {
+ messagesInFetch++;
+ final ByteBuffer payload = msg.message().payload();
+ final long offset = msg.offset();
+
+ if (offset <= currentPartition.getOffset()) {
+ // we have seen this message already
+ LOG.info("Skipping message with offset " + msg.offset()
+ + " because we have seen messages until (including) "
+ + currentPartition.getOffset()
+ + " from topic/partition " + currentPartition.getTopic() + '/'
+ + currentPartition.getPartition() + " already");
+ continue;
+ }
+
+ // If the message value is null, this represents a delete command for the message key.
+ // Log this and pass it on to the client who might want to also receive delete messages.
+ byte[] valueBytes;
+ if (payload == null) {
+ deletedMessages++;
+ valueBytes = null;
+ } else {
+ valueBytes = new byte[payload.remaining()];
+ payload.get(valueBytes);
+ }
+
+ // put key into byte array
+ byte[] keyBytes = null;
+ int keySize = msg.message().keySize();
+
+ if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization
+ ByteBuffer keyPayload = msg.message().key();
+ keyBytes = new byte[keySize];
+ keyPayload.get(keyBytes);
+ }
+
+ final T value = deserializer.deserialize(keyBytes, valueBytes,
+ currentPartition.getTopic(), currentPartition.getPartition(), offset);
+
+ if (deserializer.isEndOfStream(value)) {
+ // remove partition from subscribed partitions.
+ partitionsIterator.remove();
+ continue partitionsLoop;
+ }
+
+ owner.emitRecord(value, currentPartition, offset);
+ }
+ else {
+ // no longer running
+ return;
+ }
+ }
+ }
+ LOG.debug("This fetch contained {} messages ({} deleted messages)", messagesInFetch, deletedMessages);
+ } // end of fetch loop
+
+ if (!newPartitionsQueue.close()) {
+ throw new Exception("Bug: Cleanly leaving fetcher thread without having a closed queue.");
+ }
+ }
+ catch (Throwable t) {
+ // report to the fetcher's error handler
+ errorHandler.reportError(t);
+ }
+ finally {
+ if (consumer != null) {
+ // closing the consumer should not fail the program
+ try {
+ consumer.close();
+ }
+ catch (Throwable t) {
+ LOG.error("Error while closing the Kafka simple consumer", t);
+ }
+ }
+ }
+ }
+
+ private void getMissingOffsetsFromKafka(
+ List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException
+ {
+ // collect which partitions we should fetch offsets for
+ List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
+ for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
+ if (!part.isOffsetDefined()) {
+ // retrieve the offset from the consumer
+ partitionsToGetOffsetsFor.add(part);
+ }
+ }
+
+ if (partitionsToGetOffsetsFor.size() > 0) {
+ getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior);
+
+ LOG.info("No checkpoint/savepoint offsets found for some partitions. " +
+ "Fetched the following start offsets {}", partitionsToGetOffsetsFor);
+ }
+ }
+
+ /**
+ * Cancels this fetch thread. The thread will release all resources and terminate.
+ */
+ public void cancel() {
+ this.running = false;
+
+ // interrupt whatever the consumer is doing
+ if (consumer != null) {
+ consumer.close();
+ }
+
+ this.interrupt();
+ }
+
+ // ------------------------------------------------------------------------
+ // Kafka Request Utils
+ // ------------------------------------------------------------------------
+
+ /**
+ * Request latest offsets for a set of partitions, via a Kafka consumer.
+ *
+ * <p>This method retries three times if the response has an error.
+ *
+ * @param consumer The consumer connected to lead broker
+ * @param partitions The list of partitions we need offsets for
+ * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
+ */
+ private static void getLastOffsetFromKafka(
+ SimpleConsumer consumer,
+ List<KafkaTopicPartitionState<TopicAndPartition>> partitions,
+ long whichTime) throws IOException
+ {
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
+ for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
+ requestInfo.put(part.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(whichTime, 1));
+ }
+
+ int retries = 0;
+ OffsetResponse response;
+ while (true) {
+ kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+ requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+ response = consumer.getOffsetsBefore(request);
+
+ if (response.hasError()) {
+ StringBuilder exception = new StringBuilder();
+ for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
+ short code;
+ if ((code = response.errorCode(part.getTopic(), part.getPartition())) != ErrorMapping.NoError()) {
+ exception.append("\nException for topic=").append(part.getTopic())
+ .append(" partition=").append(part.getPartition()).append(": ")
+ .append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code)));
+ }
+ }
+ if (++retries >= 3) {
+ throw new IOException("Unable to get last offset for partitions " + partitions + ": "
+ + exception.toString());
+ } else {
+ LOG.warn("Unable to get last offset for partitions: Exception(s): {}", exception);
+ }
+ } else {
+ break; // leave retry loop
+ }
+ }
+
+ for (KafkaTopicPartitionState<TopicAndPartition> part: partitions) {
+ final long offset = response.offsets(part.getTopic(), part.getPartition())[0];
+
+ // the offset returned is that of the next record to fetch. because our state reflects the latest
+ // successfully emitted record, we subtract one
+ part.setOffset(offset - 1);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
new file mode 100644
index 0000000..8f2ef09
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.utils.ZKGroupTopicDirs;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Handler for committing Kafka offsets to Zookeeper and to retrieve them again.
+ */
+public class ZookeeperOffsetHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
+
+ private final String groupId;
+
+ private final CuratorFramework curatorClient;
+
+
+ public ZookeeperOffsetHandler(Properties props) {
+ this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+ if (this.groupId == null) {
+ throw new IllegalArgumentException("Required property '"
+ + ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
+ }
+
+ String zkConnect = props.getProperty("zookeeper.connect");
+ if (zkConnect == null) {
+ throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
+ }
+
+ // we use Curator's default timeouts
+ int sessionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
+ int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
+
+ // undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
+ int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
+ int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
+ curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
+ curatorClient.start();
+ }
+
+ // ------------------------------------------------------------------------
+ // Offset access and manipulation
+ // ------------------------------------------------------------------------
+
+ /**
+ * Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of
+ * the last processed records; this method will take care of incrementing the offsets by 1 before committing them so
+ * that the committed offsets to Zookeeper represent the next record to process.
+ *
+ * @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit.
+ * @throws Exception The method forwards exceptions.
+ */
+ public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception {
+ for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) {
+ KafkaTopicPartition tp = entry.getKey();
+
+ Long lastProcessedOffset = entry.getValue();
+ if (lastProcessedOffset != null && lastProcessedOffset >= 0) {
+ setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
+ }
+ }
+ }
+
+ /**
+ * @param partitions The partitions to read offsets for.
+ * @return The mapping from partition to offset.
+ * @throws Exception This method forwards exceptions.
+ */
+ public Map<KafkaTopicPartition, Long> getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception {
+ Map<KafkaTopicPartition, Long> ret = new HashMap<>(partitions.size());
+ for (KafkaTopicPartition tp : partitions) {
+ Long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition());
+
+ if (offset != null) {
+ LOG.info("Offset for TopicPartition {}:{} was set to {} in ZooKeeper. Seeking fetcher to that position.",
+ tp.getTopic(), tp.getPartition(), offset);
+ ret.put(tp, offset);
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Closes the offset handler.
+ *
+ * @throws IOException Thrown, if the handler cannot be closed properly.
+ */
+ public void close() throws IOException {
+ curatorClient.close();
+ }
+
+ // ------------------------------------------------------------------------
+ // Communication with Zookeeper
+ // ------------------------------------------------------------------------
+
+ public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
+ ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
+ String path = topicDirs.consumerOffsetDir() + "/" + partition;
+ curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
+ byte[] data = Long.toString(offset).getBytes();
+ curatorClient.setData().forPath(path, data);
+ }
+
+ public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
+ ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
+ String path = topicDirs.consumerOffsetDir() + "/" + partition;
+ curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
+
+ byte[] data = curatorClient.getData().forPath(path);
+
+ if (data == null) {
+ return null;
+ } else {
+ String asString = new String(data);
+ if (asString.length() == 0) {
+ return null;
+ } else {
+ try {
+ return Long.valueOf(asString);
+ }
+ catch (NumberFormatException e) {
+ LOG.error(
+ "The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
+ groupId, topic, partition, asString);
+ return null;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
new file mode 100644
index 0000000..fabb0fe
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class Kafka08ITCase extends KafkaConsumerTestBase {
+
+ // ------------------------------------------------------------------------
+ // Suite of Tests
+ // ------------------------------------------------------------------------
+
+ @Test(timeout = 60000)
+ public void testFailOnNoBroker() throws Exception {
+ runFailOnNoBrokerTest();
+ }
+
+
+ @Test(timeout = 60000)
+ public void testConcurrentProducerConsumerTopology() throws Exception {
+ runSimpleConcurrentProducerConsumerTopology();
+ }
+
+// @Test(timeout = 60000)
+// public void testPunctuatedExplicitWMConsumer() throws Exception {
+// runExplicitPunctuatedWMgeneratingConsumerTest(false);
+// }
+
+// @Test(timeout = 60000)
+// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
+// runExplicitPunctuatedWMgeneratingConsumerTest(true);
+// }
+
+ @Test(timeout = 60000)
+ public void testKeyValueSupport() throws Exception {
+ runKeyValueTest();
+ }
+
+ // --- canceling / failures ---
+
+ @Test(timeout = 60000)
+ public void testCancelingEmptyTopic() throws Exception {
+ runCancelingOnEmptyInputTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testCancelingFullTopic() throws Exception {
+ runCancelingOnFullInputTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testFailOnDeploy() throws Exception {
+ runFailOnDeployTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testInvalidOffset() throws Exception {
+
+ final int parallelism = 1;
+
+ // write 20 messages into topic:
+ final String topic = writeSequence("invalidOffsetTopic", 20, parallelism, 1);
+
+ // set invalid offset:
+ CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
+ ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topic, 0, 1234);
+ curatorClient.close();
+
+ // read from topic
+ final int valuesCount = 20;
+ final int startFrom = 0;
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.getConfig().disableSysoutLogging();
+
+ readSequence(env, standardProps, parallelism, topic, valuesCount, startFrom);
+
+ deleteTestTopic(topic);
+ }
+
+ // --- source to partition mappings and exactly once ---
+
+ @Test(timeout = 60000)
+ public void testOneToOneSources() throws Exception {
+ runOneToOneExactlyOnceTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testOneSourceMultiplePartitions() throws Exception {
+ runOneSourceMultiplePartitionsExactlyOnceTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleSourcesOnePartition() throws Exception {
+ runMultipleSourcesOnePartitionExactlyOnceTest();
+ }
+
+ // --- broker failure ---
+
+ @Test(timeout = 60000)
+ public void testBrokerFailure() throws Exception {
+ runBrokerFailureTest();
+ }
+
+ // --- offset committing ---
+
+ @Test(timeout = 60000)
+ public void testCommitOffsetsToZookeeper() throws Exception {
+ runCommitOffsetsToKafka();
+ }
+
+ @Test(timeout = 60000)
+ public void testStartFromZookeeperCommitOffsets() throws Exception {
+ runStartFromKafkaCommitOffsets();
+ }
+
+ @Test(timeout = 60000)
+ public void testAutoOffsetRetrievalAndCommitToZookeeper() throws Exception {
+ runAutoOffsetRetrievalAndCommitToKafka();
+ }
+
+ @Test
+ public void runOffsetManipulationInZooKeeperTest() {
+ try {
+ final String topicName = "ZookeeperOffsetHandlerTest-Topic";
+ final String groupId = "ZookeeperOffsetHandlerTest-Group";
+
+ final Long offset = (long) (Math.random() * Long.MAX_VALUE);
+
+ CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient();
+ kafkaServer.createTestTopic(topicName, 3, 2);
+
+ ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset);
+
+ Long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0);
+
+ curatorFramework.close();
+
+ assertEquals(offset, fetchedOffset);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testOffsetAutocommitTest() throws Exception {
+ final int parallelism = 3;
+
+ // write a sequence from 0 to 99 to each of the 3 partitions.
+ final String topicName = writeSequence("testOffsetAutocommit", 100, parallelism, 1);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ // NOTE: We are not enabling the checkpointing!
+ env.getConfig().disableSysoutLogging();
+ env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.setParallelism(parallelism);
+
+ // the readSequence operation sleeps for 20 ms between each record.
+ // setting a delay of 25*20 = 500 for the commit interval makes
+ // sure that we commit roughly 3-4 times while reading, however
+ // at least once.
+ Properties readProps = new Properties();
+ readProps.putAll(standardProps);
+ readProps.setProperty("auto.commit.interval.ms", "500");
+
+ // read so that the offset can be committed to ZK
+ readSequence(env, readProps, parallelism, topicName, 100, 0);
+
+ // get the offset
+ CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
+
+ Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0);
+ Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1);
+ Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2);
+ curatorFramework.close();
+ LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
+
+ // ensure that the offset has been committed
+ boolean atLeastOneOffsetSet = (o1 != null && o1 > 0 && o1 <= 100) ||
+ (o2 != null && o2 > 0 && o2 <= 100) ||
+ (o3 != null && o3 > 0 && o3 <= 100);
+ assertTrue("Expecting at least one offset to be set o1="+o1+" o2="+o2+" o3="+o3, atLeastOneOffsetSet);
+
+ deleteTestTopic(topicName);
+ }
+
+ // --- special executions ---
+
+ @Test(timeout = 60000)
+ public void testBigRecordJob() throws Exception {
+ runBigRecordTestTopology();
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleTopics() throws Exception {
+ runProduceConsumeMultipleTopics();
+ }
+
+ @Test(timeout = 60000)
+ public void testAllDeletes() throws Exception {
+ runAllDeletesTest();
+ }
+
+ @Test(timeout=60000)
+ public void testEndOfStream() throws Exception {
+ runEndOfStreamTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testMetrics() throws Throwable {
+ runMetricsTest();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
new file mode 100644
index 0000000..6d0b140
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+ @Override
+ protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+ final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
+ return new Kafka08JsonTableSink(topic, properties, partitioner) {
+ @Override
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
+ SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
+ return kafkaProducer;
+ }
+ };
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected SerializationSchema<Row> getSerializationSchema() {
+ return new JsonRowSerializationSchema(FIELD_NAMES);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
new file mode 100644
index 0000000..a2d66ac
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+ @Override
+ protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
+ return new Kafka08JsonTableSource(topic, properties, fieldNames, typeInfo);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+ return (Class) JsonRowDeserializationSchema.class;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+ return (Class) FlinkKafkaConsumer08.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
new file mode 100644
index 0000000..5c951db
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class Kafka08ProducerITCase extends KafkaProducerTestBase {
+
+ @Test
+ public void testCustomPartitioning() {
+ runCustomPartitioningTest();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
new file mode 100644
index 0000000..9520f55
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.Test;
+
+public class KafkaConsumer08Test {
+
+ @Test
+ public void testValidateZooKeeperConfig() {
+ try {
+ // empty
+ Properties emptyProperties = new Properties();
+ try {
+ FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties);
+ fail("should fail with an exception");
+ }
+ catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ // no connect string (only group string)
+ Properties noConnect = new Properties();
+ noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
+ try {
+ FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect);
+ fail("should fail with an exception");
+ }
+ catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ // no group string (only connect string)
+ Properties noGroup = new Properties();
+ noGroup.put("zookeeper.connect", "localhost:47574");
+ try {
+ FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup);
+ fail("should fail with an exception");
+ }
+ catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCreateSourceWithoutCluster() {
+ try {
+ Properties props = new Properties();
+ props.setProperty("zookeeper.connect", "localhost:56794");
+ props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
+ props.setProperty("group.id", "non-existent-group");
+
+ FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
+ consumer.open(new Configuration());
+ fail();
+ }
+ catch (Exception e) {
+ assertTrue(e.getMessage().contains("Unable to retrieve any partitions"));
+ }
+ }
+
+ @Test
+ public void testAllBoostrapServerHostsAreInvalid() {
+ try {
+ String zookeeperConnect = "localhost:56794";
+ String bootstrapServers = "indexistentHost:11111";
+ String groupId = "non-existent-group";
+ Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
+ FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
+ new SimpleStringSchema(), props);
+ consumer.open(new Configuration());
+ fail();
+ } catch (Exception e) {
+ assertTrue("Exception should be thrown containing 'all bootstrap servers invalid' message!",
+ e.getMessage().contains("All the servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+ + "' config are invalid"));
+ }
+ }
+
+ @Test
+ public void testAtLeastOneBootstrapServerHostIsValid() {
+ try {
+ String zookeeperConnect = "localhost:56794";
+ // we declare one valid boostrap server, namely the one with
+ // 'localhost'
+ String bootstrapServers = "indexistentHost:11111, localhost:22222";
+ String groupId = "non-existent-group";
+ Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
+ FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
+ new SimpleStringSchema(), props);
+ consumer.open(new Configuration());
+ fail();
+ } catch (Exception e) {
+ // test is not failing because we have one valid boostrap server
+ assertTrue("The cause of the exception should not be 'all boostrap server are invalid'!",
+ !e.getMessage().contains("All the hosts provided in: " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+ + " config are invalid"));
+ }
+ }
+
+ private Properties createKafkaProps(String zookeeperConnect, String bootstrapServers, String groupId) {
+ Properties props = new Properties();
+ props.setProperty("zookeeper.connect", zookeeperConnect);
+ props.setProperty("bootstrap.servers", bootstrapServers);
+ props.setProperty("group.id", groupId);
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
new file mode 100644
index 0000000..72d2772
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaLocalSystemTime implements Time {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
+
+ @Override
+ public long milliseconds() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public long nanoseconds() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public void sleep(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ LOG.warn("Interruption", e);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
new file mode 100644
index 0000000..91fc286
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.concurrent.Future;
+
+
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import static org.junit.Assert.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaProducerBase.class)
+public class KafkaProducerTest extends TestLogger {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testPropagateExceptions() {
+ try {
+ // mock kafka producer
+ KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
+
+ // partition setup
+ when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
+ // returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour
+ Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
+
+ // failure when trying to send an element
+ when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
+ .thenAnswer(new Answer<Future<RecordMetadata>>() {
+ @Override
+ public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
+ Callback callback = (Callback) invocation.getArguments()[1];
+ callback.onCompletion(null, new Exception("Test error"));
+ return null;
+ }
+ });
+
+ // make sure the FlinkKafkaProducer instantiates our mock producer
+ whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
+
+ // (1) producer that propagates errors
+
+ FlinkKafkaProducer08<String> producerPropagating = new FlinkKafkaProducer08<>(
+ "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating));
+
+ testHarness.open();
+
+ try {
+ testHarness.processElement(new StreamRecord<>("value"));
+ testHarness.processElement(new StreamRecord<>("value"));
+ fail("This should fail with an exception");
+ }
+ catch (Exception e) {
+ assertNotNull(e.getCause());
+ assertNotNull(e.getCause().getMessage());
+ assertTrue(e.getCause().getMessage().contains("Test error"));
+ }
+
+ testHarness.close();
+
+ // (2) producer that only logs errors
+
+ FlinkKafkaProducer08<String> producerLogging = new FlinkKafkaProducer08<>(
+ "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+ producerLogging.setLogFailuresOnly(true);
+
+ testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("value"));
+ testHarness.processElement(new StreamRecord<>("value"));
+
+ testHarness.close();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
new file mode 100644
index 0000000..c28799c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase {
+
+ @Test(timeout=60000)
+ public void testAutoOffsetReset() throws Exception {
+ runAutoOffsetResetTest();
+ }
+
+ @Test(timeout=60000)
+ public void testAutoOffsetResetNone() throws Exception {
+ runFailOnAutoOffsetResetNoneEager();
+ }
+}