You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:01 UTC
[11/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
deleted file mode 100644
index d015157..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.common.TopicAndPartition;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.kafka.common.Node;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.SerializedValue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A fetcher that fetches data from Kafka brokers via the Kafka 0.8 low-level consumer API.
- * The fetcher also handles the explicit communication with ZooKeeper to fetch initial offsets
- * and to write offsets to ZooKeeper.
- *
- * @param <T> The type of elements produced by the fetcher.
- */
-public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
-
- static final KafkaTopicPartitionState<TopicAndPartition> MARKER =
- new KafkaTopicPartitionState<>(new KafkaTopicPartition("n/a", -1), new TopicAndPartition("n/a", -1));
-
- private static final Logger LOG = LoggerFactory.getLogger(Kafka08Fetcher.class);
-
- // ------------------------------------------------------------------------
-
- /** The schema to convert between Kafka's byte messages, and Flink's objects */
- private final KeyedDeserializationSchema<T> deserializer;
-
- /** The properties that configure the Kafka connection */
- private final Properties kafkaConfig;
-
- /** The subtask's runtime context */
- private final RuntimeContext runtimeContext;
-
- /** The queue of partitions that are currently not assigned to a broker connection */
- private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsQueue;
-
- /** The behavior to use in case that an offset is not valid (any more) for a partition */
- private final long invalidOffsetBehavior;
-
- /** The interval in which to automatically commit (-1 if deactivated) */
- private final long autoCommitInterval;
-
- /** The handler that reads/writes offsets from/to ZooKeeper */
- private volatile ZookeeperOffsetHandler zookeeperOffsetHandler;
-
- /** Flag to track the main work loop as alive */
- private volatile boolean running = true;
-
-
- public Kafka08Fetcher(
- SourceContext<T> sourceContext,
- List<KafkaTopicPartition> assignedPartitions,
- SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
- SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- StreamingRuntimeContext runtimeContext,
- KeyedDeserializationSchema<T> deserializer,
- Properties kafkaProperties,
- long invalidOffsetBehavior,
- long autoCommitInterval,
- boolean useMetrics) throws Exception
- {
- super(
- sourceContext,
- assignedPartitions,
- watermarksPeriodic,
- watermarksPunctuated,
- runtimeContext.getProcessingTimeService(),
- runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
- runtimeContext.getUserCodeClassLoader(),
- useMetrics);
-
- this.deserializer = checkNotNull(deserializer);
- this.kafkaConfig = checkNotNull(kafkaProperties);
- this.runtimeContext = runtimeContext;
- this.invalidOffsetBehavior = invalidOffsetBehavior;
- this.autoCommitInterval = autoCommitInterval;
- this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
-
- // initially, all these partitions are not assigned to a specific broker connection
- for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
- unassignedPartitionsQueue.add(partition);
- }
- }
-
- // ------------------------------------------------------------------------
- // Main Work Loop
- // ------------------------------------------------------------------------
-
- @Override
- public void runFetchLoop() throws Exception {
- // the map from broker to the thread that is connected to that broker
- final Map<Node, SimpleConsumerThread<T>> brokerToThread = new HashMap<>();
-
- // this holds possible the exceptions from the concurrent broker connection threads
- final ExceptionProxy errorHandler = new ExceptionProxy(Thread.currentThread());
-
- // the offset handler handles the communication with ZooKeeper, to commit externally visible offsets
- final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig);
- this.zookeeperOffsetHandler = zookeeperOffsetHandler;
-
- PeriodicOffsetCommitter periodicCommitter = null;
- try {
- // read offsets from ZooKeeper for partitions that did not restore offsets
- {
- List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
- for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
- if (!partition.isOffsetDefined()) {
- partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
- }
- }
-
- Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
- for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
- Long zkOffset = zkOffsets.get(partition.getKafkaTopicPartition());
- if (zkOffset != null) {
- // the offset in ZK represents the "next record to process", so we need to subtract it by 1
- // to correctly represent our internally checkpointed offsets
- partition.setOffset(zkOffset - 1);
- }
- }
- }
-
- // start the periodic offset committer thread, if necessary
- if (autoCommitInterval > 0) {
- LOG.info("Starting periodic offset committer, with commit interval of {}ms", autoCommitInterval);
-
- periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler,
- subscribedPartitions(), errorHandler, autoCommitInterval);
- periodicCommitter.setName("Periodic Kafka partition offset committer");
- periodicCommitter.setDaemon(true);
- periodicCommitter.start();
- }
-
- // register offset metrics
- if (useMetrics) {
- final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
- addOffsetStateGauge(kafkaMetricGroup);
- }
-
- // Main loop polling elements from the unassignedPartitions queue to the threads
- while (running) {
- // re-throw any exception from the concurrent fetcher threads
- errorHandler.checkAndThrowException();
-
- // wait for max 5 seconds trying to get partitions to assign
- // if threads shut down, this poll returns earlier, because the threads inject the
- // special marker into the queue
- List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign =
- unassignedPartitionsQueue.getBatchBlocking(5000);
- partitionsToAssign.remove(MARKER);
-
- if (!partitionsToAssign.isEmpty()) {
- LOG.info("Assigning {} partitions to broker threads", partitionsToAssign.size());
- Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders =
- findLeaderForPartitions(partitionsToAssign, kafkaConfig);
-
- // assign the partitions to the leaders (maybe start the threads)
- for (Map.Entry<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeader :
- partitionsWithLeaders.entrySet())
- {
- final Node leader = partitionsWithLeader.getKey();
- final List<KafkaTopicPartitionState<TopicAndPartition>> partitions = partitionsWithLeader.getValue();
- SimpleConsumerThread<T> brokerThread = brokerToThread.get(leader);
-
- if (!running) {
- break;
- }
-
- if (brokerThread == null || !brokerThread.getNewPartitionsQueue().isOpen()) {
- // start new thread
- brokerThread = createAndStartSimpleConsumerThread(partitions, leader, errorHandler);
- brokerToThread.put(leader, brokerThread);
- }
- else {
- // put elements into queue of thread
- ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue =
- brokerThread.getNewPartitionsQueue();
-
- for (KafkaTopicPartitionState<TopicAndPartition> fp : partitions) {
- if (!newPartitionsQueue.addIfOpen(fp)) {
- // we were unable to add the partition to the broker's queue
- // the broker has closed in the meantime (the thread will shut down)
- // create a new thread for connecting to this broker
- List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new ArrayList<>();
- seedPartitions.add(fp);
- brokerThread = createAndStartSimpleConsumerThread(seedPartitions, leader, errorHandler);
- brokerToThread.put(leader, brokerThread);
- newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for the subsequent partitions
- }
- }
- }
- }
- }
- else {
- // there were no partitions to assign. Check if any broker threads shut down.
- // we get into this section of the code, if either the poll timed out, or the
- // blocking poll was woken up by the marker element
- Iterator<SimpleConsumerThread<T>> bttIterator = brokerToThread.values().iterator();
- while (bttIterator.hasNext()) {
- SimpleConsumerThread<T> thread = bttIterator.next();
- if (!thread.getNewPartitionsQueue().isOpen()) {
- LOG.info("Removing stopped consumer thread {}", thread.getName());
- bttIterator.remove();
- }
- }
- }
-
- if (brokerToThread.size() == 0 && unassignedPartitionsQueue.isEmpty()) {
- if (unassignedPartitionsQueue.close()) {
- LOG.info("All consumer threads are finished, there are no more unassigned partitions. Stopping fetcher");
- break;
- }
- // we end up here if somebody added something to the queue in the meantime --> continue to poll queue again
- }
- }
- }
- catch (InterruptedException e) {
- // this may be thrown because an exception on one of the concurrent fetcher threads
- // woke this thread up. make sure we throw the root exception instead in that case
- errorHandler.checkAndThrowException();
-
- // no other root exception, throw the interrupted exception
- throw e;
- }
- finally {
- this.running = false;
- this.zookeeperOffsetHandler = null;
-
- // if we run a periodic committer thread, shut that down
- if (periodicCommitter != null) {
- periodicCommitter.shutdown();
- }
-
- // clear the interruption flag
- // this allows the joining on consumer threads (on best effort) to happen in
- // case the initial interrupt already
- Thread.interrupted();
-
- // make sure that in any case (completion, abort, error), all spawned threads are stopped
- try {
- int runningThreads;
- do {
- // check whether threads are alive and cancel them
- runningThreads = 0;
- Iterator<SimpleConsumerThread<T>> threads = brokerToThread.values().iterator();
- while (threads.hasNext()) {
- SimpleConsumerThread<?> t = threads.next();
- if (t.isAlive()) {
- t.cancel();
- runningThreads++;
- } else {
- threads.remove();
- }
- }
-
- // wait for the threads to finish, before issuing a cancel call again
- if (runningThreads > 0) {
- for (SimpleConsumerThread<?> t : brokerToThread.values()) {
- t.join(500 / runningThreads + 1);
- }
- }
- }
- while (runningThreads > 0);
- }
- catch (InterruptedException ignored) {
- // waiting for the thread shutdown apparently got interrupted
- // restore interrupted state and continue
- Thread.currentThread().interrupt();
- }
- catch (Throwable t) {
- // we catch all here to preserve the original exception
- LOG.error("Exception while shutting down consumer threads", t);
- }
-
- try {
- zookeeperOffsetHandler.close();
- }
- catch (Throwable t) {
- // we catch all here to preserve the original exception
- LOG.error("Exception while shutting down ZookeeperOffsetHandler", t);
- }
- }
- }
-
- @Override
- public void cancel() {
- // signal the main thread to exit
- this.running = false;
-
- // make sure the main thread wakes up soon
- this.unassignedPartitionsQueue.addIfOpen(MARKER);
- }
-
- // ------------------------------------------------------------------------
- // Kafka 0.8 specific class instantiation
- // ------------------------------------------------------------------------
-
- @Override
- public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
- return new TopicAndPartition(partition.getTopic(), partition.getPartition());
- }
-
- // ------------------------------------------------------------------------
- // Offset handling
- // ------------------------------------------------------------------------
-
- @Override
- public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
- ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
- if (zkHandler != null) {
- // the ZK handler takes care of incrementing the offsets by 1 before committing
- zkHandler.prepareAndCommitOffsets(offsets);
- }
-
- // Set committed offsets in topic partition state
- KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitions();
- for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
- Long offset = offsets.get(partition.getKafkaTopicPartition());
- if (offset != null) {
- partition.setCommittedOffset(offset);
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private SimpleConsumerThread<T> createAndStartSimpleConsumerThread(
- List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions,
- Node leader,
- ExceptionProxy errorHandler) throws IOException, ClassNotFoundException
- {
- // each thread needs its own copy of the deserializer, because the deserializer is
- // not necessarily thread safe
- final KeyedDeserializationSchema<T> clonedDeserializer =
- InstantiationUtil.clone(deserializer, runtimeContext.getUserCodeClassLoader());
-
- // seed thread with list of fetch partitions (otherwise it would shut down immediately again
- SimpleConsumerThread<T> brokerThread = new SimpleConsumerThread<>(
- this, errorHandler, kafkaConfig, leader, seedPartitions, unassignedPartitionsQueue,
- clonedDeserializer, invalidOffsetBehavior);
-
- brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
- runtimeContext.getTaskName(), leader.id(), leader.host(), leader.port()));
- brokerThread.setDaemon(true);
- brokerThread.start();
-
- LOG.info("Starting thread {}", brokerThread.getName());
- return brokerThread;
- }
-
- /**
- * Returns a list of unique topics from for the given partitions
- *
- * @param partitions A the partitions
- * @return A list of unique topics
- */
- private static List<String> getTopics(List<KafkaTopicPartitionState<TopicAndPartition>> partitions) {
- HashSet<String> uniqueTopics = new HashSet<>();
- for (KafkaTopicPartitionState<TopicAndPartition> fp: partitions) {
- uniqueTopics.add(fp.getTopic());
- }
- return new ArrayList<>(uniqueTopics);
- }
-
- /**
- * Find leaders for the partitions
- *
- * From a high level, the method does the following:
- * - Get a list of FetchPartitions (usually only a few partitions)
- * - Get the list of topics from the FetchPartitions list and request the partitions for the topics. (Kafka doesn't support getting leaders for a set of partitions)
- * - Build a Map<Leader, List<FetchPartition>> where only the requested partitions are contained.
- *
- * @param partitionsToAssign fetch partitions list
- * @return leader to partitions map
- */
- private static Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> findLeaderForPartitions(
- List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign,
- Properties kafkaProperties) throws Exception
- {
- if (partitionsToAssign.isEmpty()) {
- throw new IllegalArgumentException("Leader request for empty partitions list");
- }
-
- LOG.info("Refreshing leader information for partitions {}", partitionsToAssign);
-
- // this request is based on the topic names
- PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(getTopics(partitionsToAssign), kafkaProperties);
- infoFetcher.start();
-
- // NOTE: The kafka client apparently locks itself up sometimes
- // when it is interrupted, so we run it only in a separate thread.
- // since it sometimes refuses to shut down, we resort to the admittedly harsh
- // means of killing the thread after a timeout.
- KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
- watchDog.start();
-
- // this list contains ALL partitions of the requested topics
- List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions();
-
- // copy list to track unassigned partitions
- List<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions = new ArrayList<>(partitionsToAssign);
-
- // final mapping from leader -> list(fetchPartition)
- Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> leaderToPartitions = new HashMap<>();
-
- for(KafkaTopicPartitionLeader partitionLeader: topicPartitionWithLeaderList) {
- if (unassignedPartitions.size() == 0) {
- // we are done: all partitions are assigned
- break;
- }
-
- Iterator<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsIterator = unassignedPartitions.iterator();
- while (unassignedPartitionsIterator.hasNext()) {
- KafkaTopicPartitionState<TopicAndPartition> unassignedPartition = unassignedPartitionsIterator.next();
-
- if (unassignedPartition.getKafkaTopicPartition().equals(partitionLeader.getTopicPartition())) {
- // we found the leader for one of the fetch partitions
- Node leader = partitionLeader.getLeader();
-
- List<KafkaTopicPartitionState<TopicAndPartition>> partitionsOfLeader = leaderToPartitions.get(leader);
- if (partitionsOfLeader == null) {
- partitionsOfLeader = new ArrayList<>();
- leaderToPartitions.put(leader, partitionsOfLeader);
- }
- partitionsOfLeader.add(unassignedPartition);
- unassignedPartitionsIterator.remove(); // partition has been assigned
- break;
- }
- }
- }
-
- if (unassignedPartitions.size() > 0) {
- throw new RuntimeException("Unable to find a leader for partitions: " + unassignedPartitions);
- }
-
- LOG.debug("Partitions with assigned leaders {}", leaderToPartitions);
-
- return leaderToPartitions;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
deleted file mode 100644
index 4d61e53..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-/**
- * A watch dog thread that forcibly kills another thread, if that thread does not
- * finish in time.
- *
- * <p>This uses the discouraged {@link Thread#stop()} method. While this is not
- * advisable, this watch dog is only for extreme cases of thread that simply
- * to not terminate otherwise.
- */
-class KillerWatchDog extends Thread {
-
- private final Thread toKill;
- private final long timeout;
-
- KillerWatchDog(Thread toKill, long timeout) {
- super("KillerWatchDog");
- setDaemon(true);
-
- this.toKill = toKill;
- this.timeout = timeout;
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void run() {
- final long deadline = System.currentTimeMillis() + timeout;
- long now;
-
- while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
- try {
- toKill.join(deadline - now);
- }
- catch (InterruptedException e) {
- // ignore here, our job is important!
- }
- }
-
- // this is harsh, but this watchdog is a last resort
- if (toKill.isAlive()) {
- toKill.stop();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
deleted file mode 100644
index d8d927d..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
-
-import java.util.List;
-import java.util.Properties;
-
-class PartitionInfoFetcher extends Thread {
-
- private final List<String> topics;
- private final Properties properties;
-
- private volatile List<KafkaTopicPartitionLeader> result;
- private volatile Throwable error;
-
-
- PartitionInfoFetcher(List<String> topics, Properties properties) {
- this.topics = topics;
- this.properties = properties;
- }
-
- @Override
- public void run() {
- try {
- result = FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties);
- }
- catch (Throwable t) {
- this.error = t;
- }
- }
-
- public List<KafkaTopicPartitionLeader> getPartitions() throws Exception {
- try {
- this.join();
- }
- catch (InterruptedException e) {
- throw new Exception("Partition fetching was cancelled before completion");
- }
-
- if (error != null) {
- throw new Exception("Failed to fetch partitions for topics " + topics.toString(), error);
- }
- if (result != null) {
- return result;
- }
- throw new Exception("Partition fetching failed");
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
deleted file mode 100644
index 27d90f2..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import java.util.HashMap;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A thread that periodically writes the current Kafka partition offsets to Zookeeper.
- */
-public class PeriodicOffsetCommitter extends Thread {
-
- /** The ZooKeeper handler */
- private final ZookeeperOffsetHandler offsetHandler;
-
- private final KafkaTopicPartitionState<?>[] partitionStates;
-
- /** The proxy to forward exceptions to the main thread */
- private final ExceptionProxy errorHandler;
-
- /** Interval in which to commit, in milliseconds */
- private final long commitInterval;
-
- /** Flag to mark the periodic committer as running */
- private volatile boolean running = true;
-
- PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler,
- KafkaTopicPartitionState<?>[] partitionStates,
- ExceptionProxy errorHandler,
- long commitInterval)
- {
- this.offsetHandler = checkNotNull(offsetHandler);
- this.partitionStates = checkNotNull(partitionStates);
- this.errorHandler = checkNotNull(errorHandler);
- this.commitInterval = commitInterval;
-
- checkArgument(commitInterval > 0);
- }
-
- @Override
- public void run() {
- try {
- while (running) {
- Thread.sleep(commitInterval);
-
- // create copy a deep copy of the current offsets
- HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.length);
- for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
- offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
- }
-
- offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
- }
- }
- catch (Throwable t) {
- if (running) {
- errorHandler.reportError(
- new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
- }
- }
- }
-
- public void shutdown() {
- this.running = false;
- this.interrupt();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
deleted file mode 100644
index 35e491a..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.ErrorMapping;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.StringUtils;
-
-import org.apache.kafka.common.Node;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static java.util.Objects.requireNonNull;
-import static org.apache.flink.util.PropertiesUtil.getInt;
-
-/**
- * This class implements a thread with a connection to a single Kafka broker. The thread
- * pulls records for a set of topic partitions for which the connected broker is currently
- * the leader. The thread deserializes these records and emits them.
- *
- * @param <T> The type of elements that this consumer thread creates from Kafka's byte messages
- * and emits into the Flink DataStream.
- */
-class SimpleConsumerThread<T> extends Thread {
-
- private static final Logger LOG = LoggerFactory.getLogger(SimpleConsumerThread.class);
-
- private static final KafkaTopicPartitionState<TopicAndPartition> MARKER = Kafka08Fetcher.MARKER;
-
- // ------------------------------------------------------------------------
-
- private final Kafka08Fetcher<T> owner;
-
- private final KeyedDeserializationSchema<T> deserializer;
-
- private final List<KafkaTopicPartitionState<TopicAndPartition>> partitions;
-
- private final Node broker;
-
- /** Queue containing new fetch partitions for the consumer thread */
- private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue;
-
- private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions;
-
- private final ExceptionProxy errorHandler;
-
- private final long invalidOffsetBehavior;
-
- private volatile boolean running = true;
-
-
- // ----------------- Simple Consumer ----------------------
- private volatile SimpleConsumer consumer;
-
- private final int soTimeout;
- private final int minBytes;
- private final int maxWait;
- private final int fetchSize;
- private final int bufferSize;
- private final int reconnectLimit;
-
-
- // exceptions are thrown locally
- public SimpleConsumerThread(
- Kafka08Fetcher<T> owner,
- ExceptionProxy errorHandler,
- Properties config,
- Node broker,
- List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions,
- ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions,
- KeyedDeserializationSchema<T> deserializer,
- long invalidOffsetBehavior)
- {
- this.owner = owner;
- this.errorHandler = errorHandler;
- this.broker = broker;
- this.partitions = seedPartitions;
- this.deserializer = requireNonNull(deserializer);
- this.unassignedPartitions = requireNonNull(unassignedPartitions);
- this.newPartitionsQueue = new ClosableBlockingQueue<>();
- this.invalidOffsetBehavior = invalidOffsetBehavior;
-
- // these are the actual configuration values of Kafka + their original default values.
- this.soTimeout = getInt(config, "socket.timeout.ms", 30000);
- this.minBytes = getInt(config, "fetch.min.bytes", 1);
- this.maxWait = getInt(config, "fetch.wait.max.ms", 100);
- this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576);
- this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536);
- this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3);
- }
-
- public ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> getNewPartitionsQueue() {
- return newPartitionsQueue;
- }
-
- // ------------------------------------------------------------------------
- // main work loop
- // ------------------------------------------------------------------------
-
- @Override
- public void run() {
- LOG.info("Starting to fetch from {}", this.partitions);
-
- // set up the config values
- final String clientId = "flink-kafka-consumer-legacy-" + broker.id();
-
- try {
- // create the Kafka consumer that we actually use for fetching
- consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
-
- // make sure that all partitions have some offsets to start with
- // those partitions that do not have an offset from a checkpoint need to get
- // their start offset from ZooKeeper
- getMissingOffsetsFromKafka(partitions);
-
- // Now, the actual work starts :-)
- int offsetOutOfRangeCount = 0;
- int reconnects = 0;
- while (running) {
-
- // ----------------------------------- partitions list maintenance ----------------------------
-
- // check queue for new partitions to read from:
- List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = newPartitionsQueue.pollBatch();
- if (newPartitions != null) {
- // found some new partitions for this thread's broker
-
- // check if the new partitions need an offset lookup
- getMissingOffsetsFromKafka(newPartitions);
-
- // add the new partitions (and check they are not already in there)
- for (KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) {
- if (partitions.contains(newPartition)) {
- throw new IllegalStateException("Adding partition " + newPartition +
- " to subscribed partitions even though it is already subscribed");
- }
- partitions.add(newPartition);
- }
-
- LOG.info("Adding {} new partitions to consumer thread {}", newPartitions.size(), getName());
- LOG.debug("Partitions list: {}", newPartitions);
- }
-
- if (partitions.size() == 0) {
- if (newPartitionsQueue.close()) {
- // close succeeded. Closing thread
- running = false;
-
- LOG.info("Consumer thread {} does not have any partitions assigned anymore. Stopping thread.",
- getName());
-
- // add the wake-up marker into the queue to make the main thread
- // immediately wake up and termination faster
- unassignedPartitions.add(MARKER);
-
- break;
- } else {
- // close failed: fetcher main thread concurrently added new partitions into the queue.
- // go to top of loop again and get the new partitions
- continue;
- }
- }
-
- // ----------------------------------- request / response with kafka ----------------------------
-
- FetchRequestBuilder frb = new FetchRequestBuilder();
- frb.clientId(clientId);
- frb.maxWait(maxWait);
- frb.minBytes(minBytes);
-
- for (KafkaTopicPartitionState<?> partition : partitions) {
- frb.addFetch(
- partition.getKafkaTopicPartition().getTopic(),
- partition.getKafkaTopicPartition().getPartition(),
- partition.getOffset() + 1, // request the next record
- fetchSize);
- }
-
- kafka.api.FetchRequest fetchRequest = frb.build();
- LOG.debug("Issuing fetch request {}", fetchRequest);
-
- FetchResponse fetchResponse;
- try {
- fetchResponse = consumer.fetch(fetchRequest);
- }
- catch (Throwable cce) {
- //noinspection ConstantConditions
- if (cce instanceof ClosedChannelException) {
- LOG.warn("Fetch failed because of ClosedChannelException.");
- LOG.debug("Full exception", cce);
-
- // we don't know if the broker is overloaded or unavailable.
- // retry a few times, then return ALL partitions for new leader lookup
- if (++reconnects >= reconnectLimit) {
- LOG.warn("Unable to reach broker after {} retries. Returning all current partitions", reconnectLimit);
- for (KafkaTopicPartitionState<TopicAndPartition> fp: this.partitions) {
- unassignedPartitions.add(fp);
- }
- this.partitions.clear();
- continue; // jump to top of loop: will close thread or subscribe to new partitions
- }
- try {
- consumer.close();
- } catch (Throwable t) {
- LOG.warn("Error while closing consumer connection", t);
- }
- // delay & retry
- Thread.sleep(100);
- consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
- continue; // retry
- } else {
- throw cce;
- }
- }
- reconnects = 0;
-
- // ---------------------------------------- error handling ----------------------------
-
- if (fetchResponse == null) {
- throw new IOException("Fetch from Kafka failed (request returned null)");
- }
-
- if (fetchResponse.hasError()) {
- String exception = "";
- List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
-
- // iterate over partitions to get individual error codes
- Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
- boolean partitionsRemoved = false;
-
- while (partitionsIterator.hasNext()) {
- final KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next();
- short code = fetchResponse.errorCode(fp.getTopic(), fp.getPartition());
-
- if (code == ErrorMapping.OffsetOutOfRangeCode()) {
- // we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
- // Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
- partitionsToGetOffsetsFor.add(fp);
- }
- else if (code == ErrorMapping.NotLeaderForPartitionCode() ||
- code == ErrorMapping.LeaderNotAvailableCode() ||
- code == ErrorMapping.BrokerNotAvailableCode() ||
- code == ErrorMapping.UnknownCode())
- {
- // the broker we are connected to is not the leader for the partition.
- LOG.warn("{} is not the leader of {}. Reassigning leader for partition", broker, fp);
- LOG.debug("Error code = {}", code);
-
- unassignedPartitions.add(fp);
-
- partitionsIterator.remove(); // unsubscribe the partition ourselves
- partitionsRemoved = true;
- }
- else if (code != ErrorMapping.NoError()) {
- exception += "\nException for " + fp.getTopic() +":"+ fp.getPartition() + ": " +
- StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
- }
- }
- if (partitionsToGetOffsetsFor.size() > 0) {
- // safeguard against an infinite loop.
- if (offsetOutOfRangeCount++ > 3) {
- throw new RuntimeException("Found invalid offsets more than three times in partitions "
- + partitionsToGetOffsetsFor + " Exceptions: " + exception);
- }
- // get valid offsets for these partitions and try again.
- LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
- getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior);
-
- LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
- continue; // jump back to create a new fetch request. The offset has not been touched.
- }
- else if (partitionsRemoved) {
- continue; // create new fetch request
- }
- else {
- // partitions failed on an error
- throw new IOException("Error while fetching from broker '" + broker +"': " + exception);
- }
- } else {
- // successful fetch, reset offsetOutOfRangeCount.
- offsetOutOfRangeCount = 0;
- }
-
- // ----------------------------------- process fetch response ----------------------------
-
- int messagesInFetch = 0;
- int deletedMessages = 0;
- Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
-
- partitionsLoop:
- while (partitionsIterator.hasNext()) {
- final KafkaTopicPartitionState<TopicAndPartition> currentPartition = partitionsIterator.next();
-
- final ByteBufferMessageSet messageSet = fetchResponse.messageSet(
- currentPartition.getTopic(), currentPartition.getPartition());
-
- for (MessageAndOffset msg : messageSet) {
- if (running) {
- messagesInFetch++;
- final ByteBuffer payload = msg.message().payload();
- final long offset = msg.offset();
-
- if (offset <= currentPartition.getOffset()) {
- // we have seen this message already
- LOG.info("Skipping message with offset " + msg.offset()
- + " because we have seen messages until (including) "
- + currentPartition.getOffset()
- + " from topic/partition " + currentPartition.getTopic() + '/'
- + currentPartition.getPartition() + " already");
- continue;
- }
-
- // If the message value is null, this represents a delete command for the message key.
- // Log this and pass it on to the client who might want to also receive delete messages.
- byte[] valueBytes;
- if (payload == null) {
- deletedMessages++;
- valueBytes = null;
- } else {
- valueBytes = new byte[payload.remaining()];
- payload.get(valueBytes);
- }
-
- // put key into byte array
- byte[] keyBytes = null;
- int keySize = msg.message().keySize();
-
- if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization
- ByteBuffer keyPayload = msg.message().key();
- keyBytes = new byte[keySize];
- keyPayload.get(keyBytes);
- }
-
- final T value = deserializer.deserialize(keyBytes, valueBytes,
- currentPartition.getTopic(), currentPartition.getPartition(), offset);
-
- if (deserializer.isEndOfStream(value)) {
- // remove partition from subscribed partitions.
- partitionsIterator.remove();
- continue partitionsLoop;
- }
-
- owner.emitRecord(value, currentPartition, offset);
- }
- else {
- // no longer running
- return;
- }
- }
- }
- LOG.debug("This fetch contained {} messages ({} deleted messages)", messagesInFetch, deletedMessages);
- } // end of fetch loop
-
- if (!newPartitionsQueue.close()) {
- throw new Exception("Bug: Cleanly leaving fetcher thread without having a closed queue.");
- }
- }
- catch (Throwable t) {
- // report to the fetcher's error handler
- errorHandler.reportError(t);
- }
- finally {
- if (consumer != null) {
- // closing the consumer should not fail the program
- try {
- consumer.close();
- }
- catch (Throwable t) {
- LOG.error("Error while closing the Kafka simple consumer", t);
- }
- }
- }
- }
-
- private void getMissingOffsetsFromKafka(
- List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException
- {
- // collect which partitions we should fetch offsets for
- List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
- for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
- if (!part.isOffsetDefined()) {
- // retrieve the offset from the consumer
- partitionsToGetOffsetsFor.add(part);
- }
- }
-
- if (partitionsToGetOffsetsFor.size() > 0) {
- getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior);
-
- LOG.info("No checkpoint/savepoint offsets found for some partitions. " +
- "Fetched the following start offsets {}", partitionsToGetOffsetsFor);
- }
- }
-
- /**
- * Cancels this fetch thread. The thread will release all resources and terminate.
- */
- public void cancel() {
- this.running = false;
-
- // interrupt whatever the consumer is doing
- if (consumer != null) {
- consumer.close();
- }
-
- this.interrupt();
- }
-
- // ------------------------------------------------------------------------
- // Kafka Request Utils
- // ------------------------------------------------------------------------
-
- /**
- * Request latest offsets for a set of partitions, via a Kafka consumer.
- *
- * <p>This method retries three times if the response has an error.
- *
- * @param consumer The consumer connected to lead broker
- * @param partitions The list of partitions we need offsets for
- * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
- */
- private static void getLastOffsetFromKafka(
- SimpleConsumer consumer,
- List<KafkaTopicPartitionState<TopicAndPartition>> partitions,
- long whichTime) throws IOException
- {
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
- for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
- requestInfo.put(part.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(whichTime, 1));
- }
-
- int retries = 0;
- OffsetResponse response;
- while (true) {
- kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
- requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
- response = consumer.getOffsetsBefore(request);
-
- if (response.hasError()) {
- StringBuilder exception = new StringBuilder();
- for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
- short code;
- if ((code = response.errorCode(part.getTopic(), part.getPartition())) != ErrorMapping.NoError()) {
- exception.append("\nException for topic=").append(part.getTopic())
- .append(" partition=").append(part.getPartition()).append(": ")
- .append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code)));
- }
- }
- if (++retries >= 3) {
- throw new IOException("Unable to get last offset for partitions " + partitions + ": "
- + exception.toString());
- } else {
- LOG.warn("Unable to get last offset for partitions: Exception(s): {}", exception);
- }
- } else {
- break; // leave retry loop
- }
- }
-
- for (KafkaTopicPartitionState<TopicAndPartition> part: partitions) {
- final long offset = response.offsets(part.getTopic(), part.getPartition())[0];
-
- // the offset returned is that of the next record to fetch. because our state reflects the latest
- // successfully emitted record, we subtract one
- part.setOffset(offset - 1);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
deleted file mode 100644
index 8f2ef09..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.utils.ZKGroupTopicDirs;
-
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Handler for committing Kafka offsets to Zookeeper and to retrieve them again.
- */
-public class ZookeeperOffsetHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
-
- private final String groupId;
-
- private final CuratorFramework curatorClient;
-
-
- public ZookeeperOffsetHandler(Properties props) {
- this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
- if (this.groupId == null) {
- throw new IllegalArgumentException("Required property '"
- + ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
- }
-
- String zkConnect = props.getProperty("zookeeper.connect");
- if (zkConnect == null) {
- throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
- }
-
- // we use Curator's default timeouts
- int sessionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
- int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
-
- // undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
- int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
- int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
-
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
- curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
- curatorClient.start();
- }
-
- // ------------------------------------------------------------------------
- // Offset access and manipulation
- // ------------------------------------------------------------------------
-
- /**
- * Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of
- * the last processed records; this method will take care of incrementing the offsets by 1 before committing them so
- * that the committed offsets to Zookeeper represent the next record to process.
- *
- * @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit.
- * @throws Exception The method forwards exceptions.
- */
- public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception {
- for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) {
- KafkaTopicPartition tp = entry.getKey();
-
- Long lastProcessedOffset = entry.getValue();
- if (lastProcessedOffset != null && lastProcessedOffset >= 0) {
- setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
- }
- }
- }
-
- /**
- * @param partitions The partitions to read offsets for.
- * @return The mapping from partition to offset.
- * @throws Exception This method forwards exceptions.
- */
- public Map<KafkaTopicPartition, Long> getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception {
- Map<KafkaTopicPartition, Long> ret = new HashMap<>(partitions.size());
- for (KafkaTopicPartition tp : partitions) {
- Long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition());
-
- if (offset != null) {
- LOG.info("Offset for TopicPartition {}:{} was set to {} in ZooKeeper. Seeking fetcher to that position.",
- tp.getTopic(), tp.getPartition(), offset);
- ret.put(tp, offset);
- }
- }
- return ret;
- }
-
- /**
- * Closes the offset handler.
- *
- * @throws IOException Thrown, if the handler cannot be closed properly.
- */
- public void close() throws IOException {
- curatorClient.close();
- }
-
- // ------------------------------------------------------------------------
- // Communication with Zookeeper
- // ------------------------------------------------------------------------
-
- public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
- ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
- String path = topicDirs.consumerOffsetDir() + "/" + partition;
- curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
- byte[] data = Long.toString(offset).getBytes();
- curatorClient.setData().forPath(path, data);
- }
-
- public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
- ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
- String path = topicDirs.consumerOffsetDir() + "/" + partition;
- curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
-
- byte[] data = curatorClient.getData().forPath(path);
-
- if (data == null) {
- return null;
- } else {
- String asString = new String(data);
- if (asString.length() == 0) {
- return null;
- } else {
- try {
- return Long.valueOf(asString);
- }
- catch (NumberFormatException e) {
- LOG.error(
- "The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
- groupId, topic, partition, asString);
- return null;
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
deleted file mode 100644
index fabb0fe..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.runtime.client.JobCancellationException;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class Kafka08ITCase extends KafkaConsumerTestBase {
-
- // ------------------------------------------------------------------------
- // Suite of Tests
- // ------------------------------------------------------------------------
-
- @Test(timeout = 60000)
- public void testFailOnNoBroker() throws Exception {
- runFailOnNoBrokerTest();
- }
-
-
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
- runSimpleConcurrentProducerConsumerTopology();
- }
-
-// @Test(timeout = 60000)
-// public void testPunctuatedExplicitWMConsumer() throws Exception {
-// runExplicitPunctuatedWMgeneratingConsumerTest(false);
-// }
-
-// @Test(timeout = 60000)
-// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
-// runExplicitPunctuatedWMgeneratingConsumerTest(true);
-// }
-
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
- runKeyValueTest();
- }
-
- // --- canceling / failures ---
-
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
- runCancelingOnEmptyInputTest();
- }
-
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
- runCancelingOnFullInputTest();
- }
-
- @Test(timeout = 60000)
- public void testFailOnDeploy() throws Exception {
- runFailOnDeployTest();
- }
-
- @Test(timeout = 60000)
- public void testInvalidOffset() throws Exception {
-
- final int parallelism = 1;
-
- // write 20 messages into topic:
- final String topic = writeSequence("invalidOffsetTopic", 20, parallelism, 1);
-
- // set invalid offset:
- CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
- ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topic, 0, 1234);
- curatorClient.close();
-
- // read from topic
- final int valuesCount = 20;
- final int startFrom = 0;
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.getConfig().disableSysoutLogging();
-
- readSequence(env, standardProps, parallelism, topic, valuesCount, startFrom);
-
- deleteTestTopic(topic);
- }
-
- // --- source to partition mappings and exactly once ---
-
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
- runOneToOneExactlyOnceTest();
- }
-
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
- runOneSourceMultiplePartitionsExactlyOnceTest();
- }
-
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
- runMultipleSourcesOnePartitionExactlyOnceTest();
- }
-
- // --- broker failure ---
-
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
- runBrokerFailureTest();
- }
-
- // --- offset committing ---
-
- @Test(timeout = 60000)
- public void testCommitOffsetsToZookeeper() throws Exception {
- runCommitOffsetsToKafka();
- }
-
- @Test(timeout = 60000)
- public void testStartFromZookeeperCommitOffsets() throws Exception {
- runStartFromKafkaCommitOffsets();
- }
-
- @Test(timeout = 60000)
- public void testAutoOffsetRetrievalAndCommitToZookeeper() throws Exception {
- runAutoOffsetRetrievalAndCommitToKafka();
- }
-
- @Test
- public void runOffsetManipulationInZooKeeperTest() {
- try {
- final String topicName = "ZookeeperOffsetHandlerTest-Topic";
- final String groupId = "ZookeeperOffsetHandlerTest-Group";
-
- final Long offset = (long) (Math.random() * Long.MAX_VALUE);
-
- CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient();
- kafkaServer.createTestTopic(topicName, 3, 2);
-
- ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset);
-
- Long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0);
-
- curatorFramework.close();
-
- assertEquals(offset, fetchedOffset);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test(timeout = 60000)
- public void testOffsetAutocommitTest() throws Exception {
- final int parallelism = 3;
-
- // write a sequence from 0 to 99 to each of the 3 partitions.
- final String topicName = writeSequence("testOffsetAutocommit", 100, parallelism, 1);
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- // NOTE: We are not enabling the checkpointing!
- env.getConfig().disableSysoutLogging();
- env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
- env.setParallelism(parallelism);
-
- // the readSequence operation sleeps for 20 ms between each record.
- // setting a delay of 25*20 = 500 for the commit interval makes
- // sure that we commit roughly 3-4 times while reading, however
- // at least once.
- Properties readProps = new Properties();
- readProps.putAll(standardProps);
- readProps.setProperty("auto.commit.interval.ms", "500");
-
- // read so that the offset can be committed to ZK
- readSequence(env, readProps, parallelism, topicName, 100, 0);
-
- // get the offset
- CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
-
- Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0);
- Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1);
- Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2);
- curatorFramework.close();
- LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
-
- // ensure that the offset has been committed
- boolean atLeastOneOffsetSet = (o1 != null && o1 > 0 && o1 <= 100) ||
- (o2 != null && o2 > 0 && o2 <= 100) ||
- (o3 != null && o3 > 0 && o3 <= 100);
- assertTrue("Expecting at least one offset to be set o1="+o1+" o2="+o2+" o3="+o3, atLeastOneOffsetSet);
-
- deleteTestTopic(topicName);
- }
-
- // --- special executions ---
-
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
- runBigRecordTestTopology();
- }
-
- @Test(timeout = 60000)
- public void testMultipleTopics() throws Exception {
- runProduceConsumeMultipleTopics();
- }
-
- @Test(timeout = 60000)
- public void testAllDeletes() throws Exception {
- runAllDeletesTest();
- }
-
- @Test(timeout=60000)
- public void testEndOfStream() throws Exception {
- runEndOfStreamTest();
- }
-
- @Test(timeout = 60000)
- public void testMetrics() throws Throwable {
- runMetricsTest();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
deleted file mode 100644
index 6d0b140..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
-
- @Override
- protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
- final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
- return new Kafka08JsonTableSink(topic, properties, partitioner) {
- @Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
- SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
- return kafkaProducer;
- }
- };
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected SerializationSchema<Row> getSerializationSchema() {
- return new JsonRowSerializationSchema(FIELD_NAMES);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
deleted file mode 100644
index a2d66ac..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import java.util.Properties;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase {
-
- @Override
- protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
- return new Kafka08JsonTableSource(topic, properties, fieldNames, typeInfo);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
- return (Class) JsonRowDeserializationSchema.class;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
- return (Class) FlinkKafkaConsumer08.class;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
deleted file mode 100644
index 5c951db..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class Kafka08ProducerITCase extends KafkaProducerTestBase {
-
- @Test
- public void testCustomPartitioning() {
- runCustomPartitioningTest();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
deleted file mode 100644
index 9520f55..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Collections;
-import java.util.Properties;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.junit.Test;
-
-public class KafkaConsumer08Test {
-
- @Test
- public void testValidateZooKeeperConfig() {
- try {
- // empty
- Properties emptyProperties = new Properties();
- try {
- FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties);
- fail("should fail with an exception");
- }
- catch (IllegalArgumentException e) {
- // expected
- }
-
- // no connect string (only group string)
- Properties noConnect = new Properties();
- noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
- try {
- FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect);
- fail("should fail with an exception");
- }
- catch (IllegalArgumentException e) {
- // expected
- }
-
- // no group string (only connect string)
- Properties noGroup = new Properties();
- noGroup.put("zookeeper.connect", "localhost:47574");
- try {
- FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup);
- fail("should fail with an exception");
- }
- catch (IllegalArgumentException e) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testCreateSourceWithoutCluster() {
- try {
- Properties props = new Properties();
- props.setProperty("zookeeper.connect", "localhost:56794");
- props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
- props.setProperty("group.id", "non-existent-group");
-
- FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
- consumer.open(new Configuration());
- fail();
- }
- catch (Exception e) {
- assertTrue(e.getMessage().contains("Unable to retrieve any partitions"));
- }
- }
-
- @Test
- public void testAllBoostrapServerHostsAreInvalid() {
- try {
- String zookeeperConnect = "localhost:56794";
- String bootstrapServers = "indexistentHost:11111";
- String groupId = "non-existent-group";
- Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
- FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
- new SimpleStringSchema(), props);
- consumer.open(new Configuration());
- fail();
- } catch (Exception e) {
- assertTrue("Exception should be thrown containing 'all bootstrap servers invalid' message!",
- e.getMessage().contains("All the servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
- + "' config are invalid"));
- }
- }
-
- @Test
- public void testAtLeastOneBootstrapServerHostIsValid() {
- try {
- String zookeeperConnect = "localhost:56794";
- // we declare one valid boostrap server, namely the one with
- // 'localhost'
- String bootstrapServers = "indexistentHost:11111, localhost:22222";
- String groupId = "non-existent-group";
- Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
- FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
- new SimpleStringSchema(), props);
- consumer.open(new Configuration());
- fail();
- } catch (Exception e) {
- // test is not failing because we have one valid boostrap server
- assertTrue("The cause of the exception should not be 'all boostrap server are invalid'!",
- !e.getMessage().contains("All the hosts provided in: " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
- + " config are invalid"));
- }
- }
-
- private Properties createKafkaProps(String zookeeperConnect, String bootstrapServers, String groupId) {
- Properties props = new Properties();
- props.setProperty("zookeeper.connect", zookeeperConnect);
- props.setProperty("bootstrap.servers", bootstrapServers);
- props.setProperty("group.id", groupId);
- return props;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
deleted file mode 100644
index 72d2772..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.utils.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaLocalSystemTime implements Time {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
-
- @Override
- public long milliseconds() {
- return System.currentTimeMillis();
- }
-
- @Override
- public long nanoseconds() {
- return System.nanoTime();
- }
-
- @Override
- public void sleep(long ms) {
- try {
- Thread.sleep(ms);
- } catch (InterruptedException e) {
- LOG.warn("Interruption", e);
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
deleted file mode 100644
index 91fc286..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Collections;
-import java.util.concurrent.Future;
-
-
-import static org.mockito.Mockito.*;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-import static org.junit.Assert.*;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FlinkKafkaProducerBase.class)
-public class KafkaProducerTest extends TestLogger {
-
- @Test
- @SuppressWarnings("unchecked")
- public void testPropagateExceptions() {
- try {
- // mock kafka producer
- KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
-
- // partition setup
- when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
- // returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour
- Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
-
- // failure when trying to send an element
- when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
- .thenAnswer(new Answer<Future<RecordMetadata>>() {
- @Override
- public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
- Callback callback = (Callback) invocation.getArguments()[1];
- callback.onCompletion(null, new Exception("Test error"));
- return null;
- }
- });
-
- // make sure the FlinkKafkaProducer instantiates our mock producer
- whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
-
- // (1) producer that propagates errors
-
- FlinkKafkaProducer08<String> producerPropagating = new FlinkKafkaProducer08<>(
- "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
-
- OneInputStreamOperatorTestHarness<String, Object> testHarness =
- new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating));
-
- testHarness.open();
-
- try {
- testHarness.processElement(new StreamRecord<>("value"));
- testHarness.processElement(new StreamRecord<>("value"));
- fail("This should fail with an exception");
- }
- catch (Exception e) {
- assertNotNull(e.getCause());
- assertNotNull(e.getCause().getMessage());
- assertTrue(e.getCause().getMessage().contains("Test error"));
- }
-
- testHarness.close();
-
- // (2) producer that only logs errors
-
- FlinkKafkaProducer08<String> producerLogging = new FlinkKafkaProducer08<>(
- "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
- producerLogging.setLogFailuresOnly(true);
-
- testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>("value"));
- testHarness.processElement(new StreamRecord<>("value"));
-
- testHarness.close();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}