You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:45 UTC
[28/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector]
Refactor, cleanup, and fix kafka consumers
http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java
index 1f244c1..9638b84 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.internals;
-import com.google.common.base.Preconditions;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
@@ -27,345 +27,576 @@ import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
-import org.apache.flink.api.java.tuple.Tuple2;
+
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.StringUtils;
import org.apache.flink.kafka_backport.common.Node;
import org.apache.flink.kafka_backport.common.PartitionInfo;
import org.apache.flink.kafka_backport.common.TopicPartition;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+/**
+ * This fetcher uses Kafka's low-level API to pull data from a specific
+ * set of partitions and offsets for a certain topic.
+ *
+ * <p>This code is in parts based on the tutorial code for the low-level Kafka consumer.</p>
+ */
public class LegacyFetcher implements Fetcher {
- public static Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
+ /** The topic from which this fetcher pulls data */
private final String topic;
+
+ /** The properties that configure the Kafka connection */
+ private final Properties config;
+
+ /** The task name, to give more readable names to the spawned threads */
+ private final String taskName;
+
+ /** The first error that occurred in a connection thread */
+ private final AtomicReference<Throwable> error;
+
+ /** The partitions that the fetcher should read, with their starting offsets */
private Map<TopicPartition, Long> partitionsToRead;
- private boolean running = true;
- private Properties config;
-
- public final static String QUEUE_SIZE_KEY = "flink.kafka.consumer.queue.size";
- public final static String DEFAULT_QUEUE_SIZE = "10000";
+
+ /** Reference the the thread that executed the run() method. */
+ private volatile Thread mainThread;
+
+ /** Flag to shot the fetcher down */
+ private volatile boolean running = true;
public LegacyFetcher(String topic, Properties props) {
- config = props;
- this.topic = topic;
+ this(topic, props, "");
+ }
+
+ public LegacyFetcher(String topic, Properties props, String taskName) {
+ this.config = checkNotNull(props, "The config properties cannot be null");
+ this.topic = checkNotNull(topic, "The topic cannot be null");
+ this.taskName = taskName;
+ this.error = new AtomicReference<>();
}
+ // ------------------------------------------------------------------------
+ // Fetcher methods
+ // ------------------------------------------------------------------------
+
@Override
- public void partitionsToRead(List<TopicPartition> partitions) {
+ public void setPartitionsToRead(List<TopicPartition> partitions) {
partitionsToRead = new HashMap<TopicPartition, Long>(partitions.size());
- for(TopicPartition tp: partitions) {
- partitionsToRead.put(tp, FlinkKafkaConsumerBase.OFFSET_NOT_SET);
+ for (TopicPartition tp: partitions) {
+ partitionsToRead.put(tp, FlinkKafkaConsumer.OFFSET_NOT_SET);
}
}
@Override
public void seek(TopicPartition topicPartition, long offsetToRead) {
- if(partitionsToRead == null) {
+ if (partitionsToRead == null) {
throw new IllegalArgumentException("No partitions to read set");
}
- if(!partitionsToRead.containsKey(topicPartition)) {
- throw new IllegalArgumentException("Can not set offset on a partition ("+topicPartition+") we are not going to read. " +
- "Partitions to read "+partitionsToRead);
+ if (!partitionsToRead.containsKey(topicPartition)) {
+ throw new IllegalArgumentException("Can not set offset on a partition (" + topicPartition
+ + ") we are not going to read. Partitions to read " + partitionsToRead);
}
partitionsToRead.put(topicPartition, offsetToRead);
}
-
+
@Override
public void close() {
- running = false;
+ // flag needs to be check by the run() method that creates the spawned threads
+ this.running = false;
+
+ // all other cleanup is made by the run method itself
}
@Override
- public <T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, long[] lastOffsets) {
- if(partitionsToRead == null || partitionsToRead.size() == 0) {
+ public <T> void run(SourceFunction.SourceContext<T> sourceContext,
+ DeserializationSchema<T> valueDeserializer,
+ long[] lastOffsets) throws Exception {
+
+ if (partitionsToRead == null || partitionsToRead.size() == 0) {
throw new IllegalArgumentException("No partitions set");
}
+
+ // NOTE: This method is needs to always release all resources it acquires
+
+ this.mainThread = Thread.currentThread();
- LOG.info("Reading from partitions "+partitionsToRead+" using the legacy fetcher");
+ LOG.info("Reading from partitions " + partitionsToRead + " using the legacy fetcher");
+
// get lead broker for each partition
- List<PartitionInfo> allPartitionsInTopic = FlinkKafkaConsumerBase.getPartitionsForTopic(topic, config);
-
+
+ // NOTE: The kafka client apparently locks itself in an infinite loop sometimes
+ // when it is interrupted, so we run it only in a separate thread.
+ // since it sometimes refuses to shut down, we resort to the admittedly harsh
+ // means of killing the thread after a timeout.
+ PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(topic, config);
+ infoFetcher.start();
+
+ KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
+ watchDog.start();
+
+ final List<PartitionInfo> allPartitionsInTopic = infoFetcher.getPartitions();
+
// brokers to fetch partitions from.
int fetchPartitionsCount = 0;
Map<Node, List<FetchPartition>> fetchBrokers = new HashMap<Node, List<FetchPartition>>();
- for(PartitionInfo partitionInfo : allPartitionsInTopic) {
- if(partitionInfo.leader() == null) {
- throw new RuntimeException("Unable to consume partition "+partitionInfo.partition()+" from topic "+partitionInfo.topic()+" because it does not have a leader");
+
+ for (PartitionInfo partitionInfo : allPartitionsInTopic) {
+ if (partitionInfo.leader() == null) {
+ throw new RuntimeException("Unable to consume partition " + partitionInfo.partition()
+ + " from topic "+partitionInfo.topic()+" because it does not have a leader");
}
- for(Map.Entry<TopicPartition, Long> partitionToRead: partitionsToRead.entrySet()) {
- if(partitionToRead.getKey().partition() == partitionInfo.partition()) {
+
+ for (Map.Entry<TopicPartition, Long> entry : partitionsToRead.entrySet()) {
+ final TopicPartition topicPartition = entry.getKey();
+ final long offset = entry.getValue();
+
+ // check if that partition is for us
+ if (topicPartition.partition() == partitionInfo.partition()) {
List<FetchPartition> partitions = fetchBrokers.get(partitionInfo.leader());
- if(partitions == null) {
+ if (partitions == null) {
partitions = new ArrayList<FetchPartition>();
+ fetchBrokers.put(partitionInfo.leader(), partitions);
}
- FetchPartition fp = new FetchPartition();
- fp.nextOffsetToRead = partitionToRead.getValue();
- fp.partition = partitionToRead.getKey().partition();
- partitions.add(fp);
+
+ partitions.add(new FetchPartition(topicPartition.partition(), offset));
fetchPartitionsCount++;
- fetchBrokers.put(partitionInfo.leader(), partitions);
+
}
+ // else this partition is not for us
}
}
- if(partitionsToRead.size() != fetchPartitionsCount) {
- throw new RuntimeException(partitionsToRead.size() + " partitions to read, but got only "+fetchPartitionsCount+" partition infos with lead brokers.");
+
+ if (partitionsToRead.size() != fetchPartitionsCount) {
+ throw new RuntimeException(partitionsToRead.size() + " partitions to read, but got only "
+ + fetchPartitionsCount + " partition infos with lead brokers.");
}
- // Create a queue for the threads to communicate
- int queueSize = Integer.valueOf(config.getProperty(QUEUE_SIZE_KEY, DEFAULT_QUEUE_SIZE));
- LinkedBlockingQueue<Tuple2<MessageAndOffset, Integer>> messageQueue = new LinkedBlockingQueue<Tuple2<MessageAndOffset, Integer>>(queueSize);
// create SimpleConsumers for each broker
- List<SimpleConsumerThread> consumers = new ArrayList<SimpleConsumerThread>(fetchBrokers.size());
- for(Map.Entry<Node, List<FetchPartition>> brokerInfo: fetchBrokers.entrySet()) {
- SimpleConsumerThread thread = new SimpleConsumerThread(this.config, topic, brokerInfo.getKey(), brokerInfo.getValue(), messageQueue);
+ ArrayList<SimpleConsumerThread<?>> consumers = new ArrayList<>(fetchBrokers.size());
+
+ for (Map.Entry<Node, List<FetchPartition>> brokerInfo : fetchBrokers.entrySet()) {
+ final Node broker = brokerInfo.getKey();
+ final List<FetchPartition> partitionsList = brokerInfo.getValue();
+
+ FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
+
+ SimpleConsumerThread<T> thread = new SimpleConsumerThread<T>(this, config, topic,
+ broker, partitions, sourceContext, valueDeserializer, lastOffsets);
+
+ thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
+ taskName, broker.idString(), broker.host(), broker.port()));
thread.setDaemon(true);
- thread.setName("KafkaConsumer-SimpleConsumer-" + brokerInfo.getKey().idString());
- thread.start();
consumers.add(thread);
- LOG.info("Starting thread "+thread.getName()+" for fetching from broker "+brokerInfo.getKey().host());
}
-
- // read from queue:
- while(running) {
- try {
- Tuple2<MessageAndOffset, Integer> msg = messageQueue.take();
- ByteBuffer payload = msg.f0.message().payload();
- byte[] valueByte = new byte[payload.limit()];
- payload.get(valueByte);
- T value = valueDeserializer.deserialize(valueByte);
- synchronized (sourceContext.getCheckpointLock()) {
- lastOffsets[msg.f1] = msg.f0.offset();
- sourceContext.collect(value);
+
+ // last check whether we should abort.
+ if (!running) {
+ return;
+ }
+
+ // start all consumer threads
+ for (SimpleConsumerThread<?> t : consumers) {
+ LOG.info("Starting thread {}", t.getName());
+ t.start();
+ }
+
+ // wait until all consumer threads are done, or until we are aborted, or until
+ // an error occurred in one of the fetcher threads
+ try {
+ boolean someConsumersRunning = true;
+ while (running && error.get() == null && someConsumersRunning) {
+ try {
+ // wait for the consumer threads. if an error occurs, we are interrupted
+ for (SimpleConsumerThread<?> t : consumers) {
+ t.join();
+ }
+
+ // safety net
+ someConsumersRunning = false;
+ for (SimpleConsumerThread<?> t : consumers) {
+ someConsumersRunning |= t.isAlive();
+ }
}
- } catch (InterruptedException e) {
- LOG.info("Queue consumption thread got interrupted. Stopping consumption and interrupting other threads");
- running = false;
- for(SimpleConsumerThread t: consumers) {
- t.interrupt();
+ catch (InterruptedException e) {
+ // ignore. we should notice what happened in the next loop check
}
}
-
- // see how the consumer threads are doing:
- for(SimpleConsumerThread t: consumers) {
- if(t.getError() != null) {
- throw new RuntimeException("Consumer thread "+t.getName()+" had an exception", t.getError());
- }
+
+ // make sure any asynchronous error is noticed
+ Throwable error = this.error.get();
+ if (error != null) {
+ throw new Exception(error.getMessage(), error);
}
}
-
- for(SimpleConsumerThread t: consumers) {
- t.close();
+ finally {
+ // make sure that in any case (completion, abort, error), all spawned threads are stopped
+ for (SimpleConsumerThread<?> t : consumers) {
+ if (t.isAlive()) {
+ t.cancel();
+ }
+ }
}
- sourceContext.close();
- }
-
- @Override
- public void stop() {
- running = false;
}
-
- @Override
- public void commit(Map<TopicPartition, Long> offsetsToCommit) {
- throw new UnsupportedOperationException("This fetcher does not support committing offsets");
+
+ /**
+ * Reports an error from a fetch thread. This will cause the main thread to see this error,
+ * abort, and cancel all other fetch threads.
+ *
+ * @param error The error to report.
+ */
+ void onErrorInFetchThread(Throwable error) {
+ if (this.error.compareAndSet(null, error)) {
+ // we are the first to report an error
+ if (mainThread != null) {
+ mainThread.interrupt();
+ }
+ }
}
+ // ------------------------------------------------------------------------
/**
* Representation of a partition to fetch.
*/
private static class FetchPartition {
- /**
- * ID of the partition within the topic (0 indexed, as given by Kafka)
- */
- public int partition;
- /**
- * Offset pointing at the next element to read from that partition.
- */
- public long nextOffsetToRead;
-
+
+ /** ID of the partition within the topic (0 indexed, as given by Kafka) */
+ int partition;
+
+ /** Offset pointing at the next element to read from that partition. */
+ long nextOffsetToRead;
+
+ FetchPartition(int partition, long nextOffsetToRead) {
+ this.partition = partition;
+ this.nextOffsetToRead = nextOffsetToRead;
+ }
+
@Override
public String toString() {
- return "FetchPartition{" +
- "partition=" + partition +
- ", offset=" + nextOffsetToRead +
- '}';
+ return "FetchPartition {partition=" + partition + ", offset=" + nextOffsetToRead + '}';
}
}
- // -------------------------- Thread for a connection to a broker --------------------------
-
- private static class SimpleConsumerThread extends Thread {
-
- private final SimpleConsumer consumer;
- private final List<FetchPartition> partitions;
- private final LinkedBlockingQueue<Tuple2<MessageAndOffset, Integer>> messageQueue;
- private final String clientId;
+ // ------------------------------------------------------------------------
+ // Per broker fetcher
+ // ------------------------------------------------------------------------
+
+ /**
+ * Each broker needs its separate connection. This thread implements the connection to
+ * one broker. The connection can fetch multiple partitions from the broker.
+ *
+ * @param <T> The data type fetched.
+ */
+ private static class SimpleConsumerThread<T> extends Thread {
+
+ private final SourceFunction.SourceContext<T> sourceContext;
+ private final DeserializationSchema<T> valueDeserializer;
+ private final long[] offsetsState;
+
+ private final FetchPartition[] partitions;
+
+ private final Node broker;
private final String topic;
+ private final Properties config;
- private final int fetchSize;
- private final int maxWait;
- private final int minBytes;
+ private final LegacyFetcher owner;
- private boolean running = true;
- private Throwable error = null;
+ private SimpleConsumer consumer;
+
+ private volatile boolean running = true;
// exceptions are thrown locally
- public SimpleConsumerThread(Properties config, String topic, Node leader, List<FetchPartition> partitions, LinkedBlockingQueue<Tuple2<MessageAndOffset, Integer>> messageQueue) {
- Preconditions.checkNotNull(leader, "Leader can not be null");
- Preconditions.checkNotNull(config, "The config properties can not be null");
- // these are the actual configuration values of Kafka + their original default values.
- int soTimeout = Integer.valueOf(config.getProperty("socket.timeout.ms", "30000"));
- int bufferSize = Integer.valueOf(config.getProperty("socket.receive.buffer.bytes", "65536"));
-
- this.fetchSize = Integer.valueOf(config.getProperty("fetch.message.max.bytes", "1048576"));
- this.maxWait = Integer.valueOf(config.getProperty("fetch.wait.max.ms", "100"));
- this.minBytes = Integer.valueOf(config.getProperty("fetch.min.bytes", "1"));
-
+ public SimpleConsumerThread(LegacyFetcher owner,
+ Properties config, String topic,
+ Node broker,
+ FetchPartition[] partitions,
+ SourceFunction.SourceContext<T> sourceContext,
+ DeserializationSchema<T> valueDeserializer,
+ long[] offsetsState) {
+ this.owner = owner;
+ this.config = config;
this.topic = topic;
+ this.broker = broker;
this.partitions = partitions;
- this.messageQueue = messageQueue;
- this.clientId = "flink-kafka-consumer-legacy-" + leader.idString();
- // create consumer
- consumer = new SimpleConsumer(leader.host(), leader.port(), bufferSize, soTimeout, clientId);
-
- // list of partitions for which we need to get offsets (this is only effective if the offset is really not initialized
- List<FetchPartition> getOffsetPartitions = new ArrayList<FetchPartition>();
- for (FetchPartition fp : partitions) {
- if (fp.nextOffsetToRead == FlinkKafkaConsumerBase.OFFSET_NOT_SET) {
- // retrieve the offset from the consumer
- getOffsetPartitions.add(fp);
- }
- }
- if (getOffsetPartitions.size() > 0) {
- long timeType = 0;
- if (config.getProperty("auto.offset.reset", "latest").equals("latest")) {
- timeType = OffsetRequest.LatestTime();
- } else {
- timeType = OffsetRequest.EarliestTime();
- }
- getLastOffset(consumer, topic, getOffsetPartitions, timeType);
- LOG.info("No offsets found for topic " + topic + ", fetched the following start offsets {}", getOffsetPartitions);
- }
+ this.sourceContext = checkNotNull(sourceContext);
+ this.valueDeserializer = checkNotNull(valueDeserializer);
+ this.offsetsState = checkNotNull(offsetsState);
}
@Override
public void run() {
try {
+ // set up the config values
+ final String clientId = "flink-kafka-consumer-legacy-" + broker.idString();
+
+ // these are the actual configuration values of Kafka + their original default values.
+
+ final int soTimeout = Integer.valueOf(config.getProperty("socket.timeout.ms", "30000"));
+ final int bufferSize = Integer.valueOf(config.getProperty("socket.receive.buffer.bytes", "65536"));
+ final int fetchSize = Integer.valueOf(config.getProperty("fetch.message.max.bytes", "1048576"));
+ final int maxWait = Integer.valueOf(config.getProperty("fetch.wait.max.ms", "100"));
+ final int minBytes = Integer.valueOf(config.getProperty("fetch.min.bytes", "1"));
+
+ // create the Kafka consumer that we actually use for fetching
+ consumer = new SimpleConsumer(broker.host(), broker.port(), bufferSize, soTimeout, clientId);
+
+ // make sure that all partitions have some offsets to start with
+ // those partitions that do not have an offset from a checkpoint need to get
+ // their start offset from ZooKeeper
+
+ List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<FetchPartition>();
+
+ for (FetchPartition fp : partitions) {
+ if (fp.nextOffsetToRead == FlinkKafkaConsumer.OFFSET_NOT_SET) {
+ // retrieve the offset from the consumer
+ partitionsToGetOffsetsFor.add(fp);
+ }
+ }
+ if (partitionsToGetOffsetsFor.size() > 0) {
+ long timeType;
+ if (config.getProperty("auto.offset.reset", "latest").equals("latest")) {
+ timeType = OffsetRequest.LatestTime();
+ } else {
+ timeType = OffsetRequest.EarliestTime();
+ }
+ getLastOffset(consumer, topic, partitionsToGetOffsetsFor, timeType);
+ LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}",
+ topic, partitionsToGetOffsetsFor);
+ }
+
+ // Now, the actual work starts :-)
+
while (running) {
FetchRequestBuilder frb = new FetchRequestBuilder();
- frb.clientId(this.clientId);
+ frb.clientId(clientId);
frb.maxWait(maxWait);
frb.minBytes(minBytes);
+
for (FetchPartition fp : partitions) {
- frb.addFetch(topic, fp.partition, fp.nextOffsetToRead, this.fetchSize);
+ frb.addFetch(topic, fp.partition, fp.nextOffsetToRead, fetchSize);
}
kafka.api.FetchRequest fetchRequest = frb.build();
LOG.debug("Issuing fetch request {}", fetchRequest);
- FetchResponse fetchResponse = null;
+ FetchResponse fetchResponse;
fetchResponse = consumer.fetch(fetchRequest);
-
if (fetchResponse.hasError()) {
String exception = "";
for (FetchPartition fp : partitions) {
short code;
if ((code = fetchResponse.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
- exception += "\nException for partition " + fp.partition + ": " + StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+ exception += "\nException for partition " + fp.partition + ": " +
+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
}
}
- throw new RuntimeException("Error while fetching from broker: " + exception);
+ throw new IOException("Error while fetching from broker: " + exception);
}
int messagesInFetch = 0;
for (FetchPartition fp : partitions) {
- ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, fp.partition);
+ final ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, fp.partition);
+ final int partition = fp.partition;
+
for (MessageAndOffset msg : messageSet) {
- messagesInFetch++;
- try {
+ if (running) {
+ messagesInFetch++;
if (msg.offset() < fp.nextOffsetToRead) {
- LOG.info("Skipping message with offset " + msg.offset() + " because we have seen messages until " + fp.nextOffsetToRead + " from partition " + fp.partition + " already");
// we have seen this message already
+ LOG.info("Skipping message with offset " + msg.offset()
+ + " because we have seen messages until " + fp.nextOffsetToRead
+ + " from partition " + fp.partition + " already");
continue;
}
- messageQueue.put(new Tuple2<MessageAndOffset, Integer>(msg, fp.partition));
- fp.nextOffsetToRead = msg.offset() + 1; // advance offset for the next request
- } catch (InterruptedException e) {
- LOG.debug("Consumer thread got interrupted. Stopping consumption");
- running = false;
+
+ ByteBuffer payload = msg.message().payload();
+ byte[] valueByte = new byte[payload.remaining()];
+ payload.get(valueByte);
+
+ final T value = valueDeserializer.deserialize(valueByte);
+ final long offset = msg.offset();
+
+ synchronized (sourceContext.getCheckpointLock()) {
+ offsetsState[partition] = offset;
+ sourceContext.collect(value);
+ }
+
+ // advance offset for the next request
+ fp.nextOffsetToRead = offset + 1;
+ }
+ else {
+ // no longer running
+ return;
}
}
}
LOG.debug("This fetch contained {} messages", messagesInFetch);
}
- } catch(Throwable cause) {
- this.error = new RuntimeException("Error while reading data in thread "+this.getName(), cause);
- } finally {
+ }
+ catch (Throwable t) {
+ // report to the main thread
+ owner.onErrorInFetchThread(t);
+ }
+ finally {
// end of run loop. close connection to consumer
- consumer.close();
+ if (consumer != null) {
+ // closing the consumer should not fail the program
+ try {
+ consumer.close();
+ }
+ catch (Throwable t) {
+ LOG.error("Error while closing the Kafka simple consumer", t);
+ }
+ }
}
-
}
- public void close() {
- running = false;
- consumer.close();
+ /**
+ * Cancels this fetch thread. The thread will release all resources and terminate.
+ */
+ public void cancel() {
+ this.running = false;
+
+ // interrupt whatever the consumer is doing
+ if (consumer != null) {
+ consumer.close();
+ }
+
+ this.interrupt();
}
- public Throwable getError() {
- return error;
+ /**
+ * Request latest offsets for a set of partitions, via a Kafka consumer.
+ *
+ * @param consumer The consumer connected to lead broker
+ * @param topic The topic name
+ * @param partitions The list of partitions we need offsets for
+ * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
+ */
+ private static void getLastOffset(SimpleConsumer consumer, String topic, List<FetchPartition> partitions, long whichTime) {
+
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+ for (FetchPartition fp: partitions) {
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, fp.partition);
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+ }
+
+ kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+ OffsetResponse response = consumer.getOffsetsBefore(request);
+
+ if (response.hasError()) {
+ String exception = "";
+ for (FetchPartition fp: partitions) {
+ short code;
+ if ( (code=response.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
+ exception += "\nException for partition "+fp.partition+": "+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+ }
+ }
+ throw new RuntimeException("Unable to get last offset for topic " + topic + " and partitions " + partitions
+ + ". " + exception);
+ }
+
+ for (FetchPartition fp: partitions) {
+ // the resulting offset is the next offset we are going to read
+ // for not-yet-consumed partitions, it is 0.
+ fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0];
+ }
}
}
+
+ private static class PartitionInfoFetcher extends Thread {
- /**
- * Request latest offsets from Kafka.
- *
- * @param consumer consumer connected to lead broker
- * @param topic topic name
- * @param partitions list of partitions we need offsets for
- * @param whichTime type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
- */
- private static void getLastOffset(SimpleConsumer consumer, String topic, List<FetchPartition> partitions, long whichTime) {
+ private final String topic;
+ private final Properties properties;
+
+ private volatile List<PartitionInfo> result;
+ private volatile Throwable error;
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
- for(FetchPartition fp: partitions) {
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic, fp.partition);
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+
+ PartitionInfoFetcher(String topic, Properties properties) {
+ this.topic = topic;
+ this.properties = properties;
}
- kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
- OffsetResponse response = consumer.getOffsetsBefore(request);
-
- if (response.hasError()) {
- String exception = "";
- for(FetchPartition fp: partitions) {
- short code;
- if( (code=response.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
- exception += "\nException for partition "+fp.partition+": "+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
- }
+ @Override
+ public void run() {
+ try {
+ result = FlinkKafkaConsumer.getPartitionsForTopic(topic, properties);
+ }
+ catch (Throwable t) {
+ this.error = t;
+ }
+ }
+
+ public List<PartitionInfo> getPartitions() throws Exception {
+ try {
+ this.join();
}
- throw new RuntimeException("Unable to get last offset for topic "+topic+" and partitions "+partitions +". "+exception);
+ catch (InterruptedException e) {
+ throw new Exception("Partition fetching was cancelled before completion");
+ }
+
+ if (error != null) {
+ throw new Exception("Failed to fetch partitions for topic " + topic, error);
+ }
+ if (result != null) {
+ return result;
+ }
+ throw new Exception("Partition fetching failed");
}
+ }
- for(FetchPartition fp: partitions) {
- // the resulting offset is the next offset we are going to read
- // for not-yet-consumed partitions, it is 0.
- fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0];
+ private static class KillerWatchDog extends Thread {
+
+ private final Thread toKill;
+ private final long timeout;
+
+ private KillerWatchDog(Thread toKill, long timeout) {
+ super("KillerWatchDog");
+ setDaemon(true);
+
+ this.toKill = toKill;
+ this.timeout = timeout;
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public void run() {
+ final long deadline = System.currentTimeMillis() + timeout;
+ long now;
+
+ while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
+ try {
+ toKill.join(deadline - now);
+ }
+ catch (InterruptedException e) {
+ // ignore here, out job is important!
+ }
+ }
+
+ // this is harsh, but this watchdog is a last resort
+ if (toKill.isAlive()) {
+ toKill.stop();
+ }
+ }
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/NewConsumerApiFetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/NewConsumerApiFetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/NewConsumerApiFetcher.java
new file mode 100644
index 0000000..db9424e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/NewConsumerApiFetcher.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.internals;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.kafka_backport.clients.consumer.CommitType;
+import org.apache.flink.kafka_backport.clients.consumer.ConsumerRecord;
+import org.apache.flink.kafka_backport.clients.consumer.ConsumerRecords;
+import org.apache.flink.kafka_backport.clients.consumer.KafkaConsumer;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.serialization.ByteArrayDeserializer;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A fetcher that uses the new Kafka consumer API to fetch data for a specifies set of partitions.
+ */
+public class NewConsumerApiFetcher implements Fetcher, OffsetHandler {
+
+ private static final String POLL_TIMEOUT_PROPERTY = "flink.kafka.consumer.poll.timeout";
+ private static final long DEFAULT_POLL_TIMEOUT = 50;
+
+ private static final ByteArrayDeserializer NO_OP_SERIALIZER = new ByteArrayDeserializer();
+
+
+ private final KafkaConsumer<byte[], byte[]> fetcher;
+
+ private final long pollTimeout;
+
+ private volatile boolean running = true;
+
+
+ public NewConsumerApiFetcher(Properties props) {
+ this.pollTimeout = props.contains(POLL_TIMEOUT_PROPERTY) ?
+ Long.valueOf(props.getProperty(POLL_TIMEOUT_PROPERTY)) :
+ DEFAULT_POLL_TIMEOUT;
+
+ this.fetcher = new KafkaConsumer<byte[], byte[]>(props, null, NO_OP_SERIALIZER, NO_OP_SERIALIZER);
+ }
+
+ @Override
+ public void setPartitionsToRead(List<TopicPartition> partitions) {
+ synchronized (fetcher) {
+ if (fetcher.subscriptions().isEmpty()) {
+ fetcher.subscribe(partitions.toArray(new TopicPartition[partitions.size()]));
+ }
+ else {
+ throw new IllegalStateException("Fetcher has already subscribed to its set of partitions");
+ }
+ }
+ }
+
+ @Override
+ public void seek(TopicPartition topicPartition, long offsetToRead) {
+ synchronized (fetcher) {
+ fetcher.seek(topicPartition, offsetToRead);
+ }
+ }
+
+ @Override
+ public void close() {
+ running = false;
+ synchronized (fetcher) {
+ fetcher.close();
+ }
+ }
+
+ @Override
+ public <T> void run(SourceFunction.SourceContext<T> sourceContext,
+ DeserializationSchema<T> valueDeserializer, long[] lastOffsets) {
+ while (running) {
+ // poll is always returning a new object.
+ ConsumerRecords<byte[], byte[]> consumed;
+ synchronized (fetcher) {
+ consumed = fetcher.poll(pollTimeout);
+ }
+
+ final Iterator<ConsumerRecord<byte[], byte[]>> records = consumed.iterator();
+ while (running && records.hasNext()) {
+ ConsumerRecord<byte[], byte[]> record = records.next();
+ T value = valueDeserializer.deserialize(record.value());
+
+ // synchronize inside the loop to allow checkpoints in between batches
+ synchronized (sourceContext.getCheckpointLock()) {
+ sourceContext.collect(value);
+ lastOffsets[record.partition()] = record.offset();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void commit(Map<TopicPartition, Long> offsetsToCommit) {
+ synchronized (fetcher) {
+ fetcher.commit(offsetsToCommit, CommitType.SYNC);
+ }
+ }
+
+ @Override
+ public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
+ // no need to do anything here.
+ // if Kafka manages the offsets, it has them automatically
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java
new file mode 100644
index 0000000..d7eb19d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.internals;
+
+import org.apache.flink.kafka_backport.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The offset handler is responsible for locating the initial partition offsets
+ * where the source should start reading, as well as committing offsets from completed
+ * checkpoints.
+ */
+public interface OffsetHandler {
+
+ /**
+ * Commits the given offset for the partitions. May commit the offsets to the Kafka broker,
+ * or to ZooKeeper, based on its configured behavior.
+ *
+ * @param offsetsToCommit The offset to commit, per partition.
+ */
+ void commit(Map<TopicPartition, Long> offsetsToCommit) throws Exception;
+
+ /**
+ * Positions the given fetcher to the initial read offsets where the stream consumption
+ * will start from.
+ *
+ * @param partitions The partitions for which to seeks the fetcher to the beginning.
+ * @param fetcher The fetcher that will pull data from Kafka and must be positioned.
+ */
+ void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) throws Exception;
+
+ /**
+ * Closes the offset handler, releasing all resources.
+ *
+ * @throws IOException Thrown, if the closing fails.
+ */
+ void close() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java
new file mode 100644
index 0000000..a6417a7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.internals;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import java.nio.charset.Charset;
+
+/**
+ * Simple ZooKeeper serializer for Strings.
+ */
+public class ZooKeeperStringSerializer implements ZkSerializer {
+
+ private static final Charset CHARSET = Charset.forName("UTF-8");
+
+ @Override
+ public byte[] serialize(Object data) {
+ if (data instanceof String) {
+ return ((String) data).getBytes(CHARSET);
+ }
+ else {
+ throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
+ }
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ else {
+ return new String(bytes, CHARSET);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java
new file mode 100644
index 0000000..9dd1192
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.internals;
+
+import kafka.common.TopicAndPartition;
+import kafka.utils.ZKGroupTopicDirs;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.flink.kafka_backport.clients.consumer.ConsumerConfig;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.streaming.connectors.FlinkKafkaConsumer;
+import org.apache.zookeeper.data.Stat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class ZookeeperOffsetHandler implements OffsetHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
+
+ private static final long OFFSET_NOT_SET = FlinkKafkaConsumer.OFFSET_NOT_SET;
+
+
+ private final ZkClient zkClient;
+
+ private final String groupId;
+
+
+ public ZookeeperOffsetHandler(Properties props) {
+ this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+
+ if (this.groupId == null) {
+ throw new IllegalArgumentException("Required property '"
+ + ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
+ }
+
+ String zkConnect = props.getProperty("zookeeper.connect");
+ if (zkConnect == null) {
+ throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
+ }
+
+ zkClient = new ZkClient(zkConnect,
+ Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")),
+ Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")),
+ new ZooKeeperStringSerializer());
+ }
+
+
+ @Override
+ public void commit(Map<TopicPartition, Long> offsetsToCommit) {
+ for (Map.Entry<TopicPartition, Long> entry : offsetsToCommit.entrySet()) {
+ TopicPartition tp = entry.getKey();
+ long offset = entry.getValue();
+
+ if (offset >= 0) {
+ setOffsetInZooKeeper(zkClient, groupId, tp.topic(), tp.partition(), offset);
+ }
+ }
+ }
+
+ @Override
+ public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
+ for (TopicPartition tp : partitions) {
+ long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.topic(), tp.partition());
+
+ if (offset != OFFSET_NOT_SET) {
+ LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.",
+ tp.partition(), offset);
+
+ // the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset.
+ fetcher.seek(tp, offset + 1);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ zkClient.close();
+ }
+
+ // ------------------------------------------------------------------------
+ // Communication with Zookeeper
+ // ------------------------------------------------------------------------
+
+ public static void setOffsetInZooKeeper(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
+ TopicAndPartition tap = new TopicAndPartition(topic, partition);
+ ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+ ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
+ }
+
+ public static long getOffsetFromZooKeeper(ZkClient zkClient, String groupId, String topic, int partition) {
+ TopicAndPartition tap = new TopicAndPartition(topic, partition);
+ ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+
+ scala.Tuple2<Option<String>, Stat> data = ZkUtils.readDataMaybeNull(zkClient,
+ topicDirs.consumerOffsetDir() + "/" + tap.partition());
+
+ if (data._1().isEmpty()) {
+ return OFFSET_NOT_SET;
+ } else {
+ return Long.valueOf(data._1().get());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka081ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka081ITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka081ITCase.java
index 9a20186..218315f 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka081ITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka081ITCase.java
@@ -14,56 +14,87 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors;
-import org.apache.flink.streaming.connectors.internals.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import java.util.Arrays;
+import org.junit.Test;
+
import java.util.Properties;
-public class Kafka081ITCase extends KafkaTestBase {
+public class Kafka081ITCase extends KafkaConsumerTestBase {
+
@Override
- <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema deserializationSchema, Properties props) {
- return new TestFlinkKafkaConsumer081<T>(topic, deserializationSchema, props);
+ protected <T> FlinkKafkaConsumer<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
+ return new FlinkKafkaConsumer081<T>(topic, deserializationSchema, props);
+ }
+
+ // ------------------------------------------------------------------------
+ // Suite of Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testCheckpointing() {
+ runCheckpointingTest();
}
- @Override
- long[] getFinalOffsets() {
- return TestFlinkKafkaConsumer081.finalOffset;
+ @Test
+ public void testOffsetInZookeeper() {
+ runOffsetInZookeeperValidationTest();
+ }
+
+ @Test
+ public void testConcurrentProducerConsumerTopology() {
+ runSimpleConcurrentProducerConsumerTopology();
}
- @Override
- void resetOffsets() {
- TestFlinkKafkaConsumer081.finalOffset = null;
+ // --- canceling / failures ---
+
+ @Test
+ public void testCancelingEmptyTopic() {
+ runCancelingOnEmptyInputTest();
+ }
+
+ @Test
+ public void testCancelingFullTopic() {
+ runCancelingOnFullInputTest();
}
+ @Test
+ public void testFailOnDeploy() {
+ runFailOnDeployTest();
+ }
- public static class TestFlinkKafkaConsumer081<OUT> extends FlinkKafkaConsumer081<OUT> {
- public static long[] finalOffset;
- public TestFlinkKafkaConsumer081(String topicName, DeserializationSchema<OUT> deserializationSchema, Properties consumerConfig) {
- super(topicName, deserializationSchema, consumerConfig);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- synchronized (commitedOffsets) {
- LOG.info("Setting final offset from "+ Arrays.toString(commitedOffsets));
- if (finalOffset == null) {
- finalOffset = new long[commitedOffsets.length];
- }
- for(int i = 0; i < commitedOffsets.length; i++) {
- if(commitedOffsets[i] > 0) {
- if(finalOffset[i] > 0) {
- throw new RuntimeException("This is unexpected on i = "+i);
- }
- finalOffset[i] = commitedOffsets[i];
- }
- }
- }
- }
+ // --- source to partition mappings and exactly once ---
+
+ @Test
+ public void testOneToOneSources() {
+ runOneToOneExactlyOnceTest();
}
+ @Test
+ public void testOneSourceMultiplePartitions() {
+ runOneSourceMultiplePartitionsExactlyOnceTest();
+ }
+
+ @Test
+ public void testMultipleSourcesOnePartition() {
+ runMultipleSourcesOnePartitionExactlyOnceTest();
+ }
+
+ // --- broker failure ---
+
+ @Test
+ public void testBrokerFailure() {
+ runBrokerFailureTest();
+ }
+
+ // --- special executions ---
+
+ @Test
+ public void testBigRecordJob() {
+ runBigRecordTestTopology();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka082ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka082ITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka082ITCase.java
index 43cd0f9..2f80fcb 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka082ITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka082ITCase.java
@@ -14,57 +14,89 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors;
-import org.apache.flink.streaming.connectors.internals.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import java.util.Arrays;
+import org.junit.Ignore;
+import org.junit.Test;
+
import java.util.Properties;
-public class Kafka082ITCase extends KafkaTestBase {
+public class Kafka082ITCase extends KafkaConsumerTestBase {
+
@Override
- <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema deserializationSchema, Properties props) {
- return new TestFlinkKafkaConsumer082<T>(topic, deserializationSchema, props);
+ protected <T> FlinkKafkaConsumer<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
+ return new FlinkKafkaConsumer082<T>(topic, deserializationSchema, props);
}
- @Override
- long[] getFinalOffsets() {
- return TestFlinkKafkaConsumer082.finalOffset;
+ // ------------------------------------------------------------------------
+ // Suite of Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testCheckpointing() {
+ runCheckpointingTest();
}
- @Override
- void resetOffsets() {
- TestFlinkKafkaConsumer082.finalOffset = null;
+ @Test
+ public void testOffsetInZookeeper() {
+ runOffsetInZookeeperValidationTest();
+ }
+
+ @Test
+ public void testConcurrentProducerConsumerTopology() {
+ runSimpleConcurrentProducerConsumerTopology();
+ }
+
+ // --- canceling / failures ---
+
+ @Test
+ public void testCancelingEmptyTopic() {
+ runCancelingOnEmptyInputTest();
}
+ @Test
+ public void testCancelingFullTopic() {
+ runCancelingOnFullInputTest();
+ }
- public static class TestFlinkKafkaConsumer082<OUT> extends FlinkKafkaConsumer082<OUT> {
- private final static Object sync = new Object();
- public static long[] finalOffset;
- public TestFlinkKafkaConsumer082(String topicName, DeserializationSchema<OUT> deserializationSchema, Properties consumerConfig) {
- super(topicName, deserializationSchema, consumerConfig);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- synchronized (commitedOffsets) {
- LOG.info("Setting final offset from "+ Arrays.toString(commitedOffsets));
- if (finalOffset == null) {
- finalOffset = new long[commitedOffsets.length];
- }
- for(int i = 0; i < commitedOffsets.length; i++) {
- if(commitedOffsets[i] > 0) {
- if(finalOffset[i] > 0) {
- throw new RuntimeException("This is unexpected on i = "+i);
- }
- finalOffset[i] = commitedOffsets[i];
- }
- }
- }
- }
+ @Test
+ public void testFailOnDeploy() {
+ runFailOnDeployTest();
}
+ // --- source to partition mappings and exactly once ---
+
+ @Test
+ public void testOneToOneSources() {
+ runOneToOneExactlyOnceTest();
+ }
+
+ @Test
+ public void testOneSourceMultiplePartitions() {
+ runOneSourceMultiplePartitionsExactlyOnceTest();
+ }
+
+ @Test
+ public void testMultipleSourcesOnePartition() {
+ runMultipleSourcesOnePartitionExactlyOnceTest();
+ }
+
+ // --- broker failure ---
+
+ @Test
+ public void testBrokerFailure() {
+ runBrokerFailureTest();
+ }
+
+ // --- special executions ---
+
+ @Test
+ @Ignore("this does not work with the new consumer")
+ public void testBigRecordJob() {
+ runBigRecordTestTopology();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka083ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka083ITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka083ITCase.java
deleted file mode 100644
index f7933f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka083ITCase.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors;
-
-import org.apache.flink.streaming.connectors.internals.FlinkKafkaConsumerBase;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.junit.Ignore;
-
-import java.util.Arrays;
-import java.util.Properties;
-
-
-public class Kafka083ITCase extends KafkaTestBase {
- @Override
- <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema deserializationSchema, Properties props) {
- return new TestFlinkKafkaConsumer083<T>(topic, deserializationSchema, props);
- }
-
- @Override
- long[] getFinalOffsets() {
- return TestFlinkKafkaConsumer083.finalOffset;
- }
-
- @Override
- void resetOffsets() {
- TestFlinkKafkaConsumer083.finalOffset = null;
- }
-
-
- public static class TestFlinkKafkaConsumer083<OUT> extends FlinkKafkaConsumer083<OUT> {
- public static long[] finalOffset;
- public TestFlinkKafkaConsumer083(String topicName, DeserializationSchema<OUT> deserializationSchema, Properties consumerConfig) {
- super(topicName, deserializationSchema, consumerConfig);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- synchronized (commitedOffsets) {
- LOG.info("Setting final offset from "+ Arrays.toString(commitedOffsets));
- if (finalOffset == null) {
- finalOffset = new long[commitedOffsets.length];
- }
- for(int i = 0; i < commitedOffsets.length; i++) {
- if(commitedOffsets[i] > 0) {
- if(finalOffset[i] > 0) {
- throw new RuntimeException("This is unexpected on i = "+i);
- }
- finalOffset[i] = commitedOffsets[i];
- }
- }
- }
- }
- }
-
- @Ignore
- @Override
- public void brokerFailureTest() throws Exception {
- // Skipping test: The test is committing the offsets to the Kafka Broker.
- // only 0.8.3 brokers support that.
- return;
- }
-
- @Ignore
- @Override
- public void testFlinkKafkaConsumerWithOffsetUpdates() throws Exception {
- // Skipping test (see above)
- return;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
new file mode 100644
index 0000000..8248cee
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors;
+
+import org.apache.flink.kafka_backport.common.TopicPartition;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests that the partition assignment is deterministic and stable.
+ */
+public class KafkaConsumerPartitionAssignmentTest {
+
+ @Test
+ public void testPartitionsEqualConsumers() {
+ try {
+ int[] partitions = {4, 52, 17, 1};
+
+ for (int i = 0; i < partitions.length; i++) {
+ List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
+ partitions, "test-topic", partitions.length, i);
+
+ assertNotNull(parts);
+ assertEquals(1, parts.size());
+ assertTrue(contains(partitions, parts.get(0).partition()));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMultiplePartitionsPerConsumers() {
+ try {
+ final int[] partitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
+
+ final Set<Integer> allPartitions = new HashSet<>();
+ for (int i : partitions) {
+ allPartitions.add(i);
+ }
+
+ final int numConsumers = 3;
+ final int minPartitionsPerConsumer = partitions.length / numConsumers;
+ final int maxPartitionsPerConsumer = partitions.length / numConsumers + 1;
+
+ for (int i = 0; i < numConsumers; i++) {
+ List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
+ partitions, "test-topic", numConsumers, i);
+
+ assertNotNull(parts);
+ assertTrue(parts.size() >= minPartitionsPerConsumer);
+ assertTrue(parts.size() <= maxPartitionsPerConsumer);
+
+ for (TopicPartition p : parts) {
+ // check that the element was actually contained
+ assertTrue(allPartitions.remove(p.partition()));
+ }
+ }
+
+ // all partitions must have been assigned
+ assertTrue(allPartitions.isEmpty());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPartitionsFewerThanConsumers() {
+ try {
+ final int[] partitions = {4, 52, 17, 1};
+
+ final Set<Integer> allPartitions = new HashSet<>();
+ for (int i : partitions) {
+ allPartitions.add(i);
+ }
+
+ final int numConsumers = 2 * partitions.length + 3;
+
+ for (int i = 0; i < numConsumers; i++) {
+ List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
+ partitions, "test-topic", numConsumers, i);
+
+ assertNotNull(parts);
+ assertTrue(parts.size() <= 1);
+
+ for (TopicPartition p : parts) {
+ // check that the element was actually contained
+ assertTrue(allPartitions.remove(p.partition()));
+ }
+ }
+
+ // all partitions must have been assigned
+ assertTrue(allPartitions.isEmpty());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testAssignEmptyPartitions() {
+ try {
+ List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 4, 2);
+ assertNotNull(parts1);
+ assertTrue(parts1.isEmpty());
+
+ List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 1, 0);
+ assertNotNull(parts2);
+ assertTrue(parts2.isEmpty());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGrowingPartitionsRemainsStable() {
+ try {
+ final int[] newPartitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
+ final int[] initialPartitions = Arrays.copyOfRange(newPartitions, 0, 7);
+
+ final Set<Integer> allNewPartitions = new HashSet<>();
+ final Set<Integer> allInitialPartitions = new HashSet<>();
+ for (int i : newPartitions) {
+ allNewPartitions.add(i);
+ }
+ for (int i : initialPartitions) {
+ allInitialPartitions.add(i);
+ }
+
+ final int numConsumers = 3;
+ final int minInitialPartitionsPerConsumer = initialPartitions.length / numConsumers;
+ final int maxInitialPartitionsPerConsumer = initialPartitions.length / numConsumers + 1;
+ final int minNewPartitionsPerConsumer = newPartitions.length / numConsumers;
+ final int maxNewPartitionsPerConsumer = newPartitions.length / numConsumers + 1;
+
+ List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(
+ initialPartitions, "test-topic", numConsumers, 0);
+ List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(
+ initialPartitions, "test-topic", numConsumers, 1);
+ List<TopicPartition> parts3 = FlinkKafkaConsumer.assignPartitions(
+ initialPartitions, "test-topic", numConsumers, 2);
+
+ assertNotNull(parts1);
+ assertNotNull(parts2);
+ assertNotNull(parts3);
+
+ assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
+ assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
+ assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
+ assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer);
+ assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
+ assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
+
+ for (TopicPartition p : parts1) {
+ // check that the element was actually contained
+ assertTrue(allInitialPartitions.remove(p.partition()));
+ }
+ for (TopicPartition p : parts2) {
+ // check that the element was actually contained
+ assertTrue(allInitialPartitions.remove(p.partition()));
+ }
+ for (TopicPartition p : parts3) {
+ // check that the element was actually contained
+ assertTrue(allInitialPartitions.remove(p.partition()));
+ }
+
+ // all partitions must have been assigned
+ assertTrue(allInitialPartitions.isEmpty());
+
+ // grow the set of partitions and distribute anew
+
+ List<TopicPartition> parts1new = FlinkKafkaConsumer.assignPartitions(
+ newPartitions, "test-topic", numConsumers, 0);
+ List<TopicPartition> parts2new = FlinkKafkaConsumer.assignPartitions(
+ newPartitions, "test-topic", numConsumers, 1);
+ List<TopicPartition> parts3new = FlinkKafkaConsumer.assignPartitions(
+ newPartitions, "test-topic", numConsumers, 2);
+
+ // new partitions must include all old partitions
+
+ assertTrue(parts1new.size() > parts1.size());
+ assertTrue(parts2new.size() > parts2.size());
+ assertTrue(parts3new.size() > parts3.size());
+
+ assertTrue(parts1new.containsAll(parts1));
+ assertTrue(parts2new.containsAll(parts2));
+ assertTrue(parts3new.containsAll(parts3));
+
+ assertTrue(parts1new.size() >= minNewPartitionsPerConsumer);
+ assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer);
+ assertTrue(parts2new.size() >= minNewPartitionsPerConsumer);
+ assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer);
+ assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
+ assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
+
+ for (TopicPartition p : parts1new) {
+ // check that the element was actually contained
+ assertTrue(allNewPartitions.remove(p.partition()));
+ }
+ for (TopicPartition p : parts2new) {
+ // check that the element was actually contained
+ assertTrue(allNewPartitions.remove(p.partition()));
+ }
+ for (TopicPartition p : parts3new) {
+ // check that the element was actually contained
+ assertTrue(allNewPartitions.remove(p.partition()));
+ }
+
+ // all partitions must have been assigned
+ assertTrue(allNewPartitions.isEmpty());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static boolean contains(int[] array, int value) {
+ for (int i : array) {
+ if (i == value) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java
new file mode 100644
index 0000000..4949714
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.kafka_backport.clients.consumer.ConsumerConfig;
+
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class KafkaConsumerTest {
+
+ @Test
+ public void testValidateZooKeeperConfig() {
+ try {
+ // empty
+ Properties emptyProperties = new Properties();
+ try {
+ FlinkKafkaConsumer.validateZooKeeperConfig(emptyProperties);
+ fail("should fail with an exception");
+ }
+ catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ // no connect string (only group string)
+ Properties noConnect = new Properties();
+ noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
+ try {
+ FlinkKafkaConsumer.validateZooKeeperConfig(noConnect);
+ fail("should fail with an exception");
+ }
+ catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ // no group string (only connect string)
+ Properties noGroup = new Properties();
+ noGroup.put("zookeeper.connect", "localhost:47574");
+ try {
+ FlinkKafkaConsumer.validateZooKeeperConfig(noGroup);
+ fail("should fail with an exception");
+ }
+ catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSnapshot() {
+ try {
+ Field offsetsField = FlinkKafkaConsumer.class.getDeclaredField("lastOffsets");
+ Field runningField = FlinkKafkaConsumer.class.getDeclaredField("running");
+ Field mapField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
+
+ offsetsField.setAccessible(true);
+ runningField.setAccessible(true);
+ mapField.setAccessible(true);
+
+ FlinkKafkaConsumer<?> consumer = mock(FlinkKafkaConsumer.class);
+ when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+
+ long[] testOffsets = new long[] { 43, 6146, 133, 16, 162, 616 };
+ LinkedMap map = new LinkedMap();
+
+ offsetsField.set(consumer, testOffsets);
+ runningField.set(consumer, true);
+ mapField.set(consumer, map);
+
+ assertTrue(map.isEmpty());
+
+ // make multiple checkpoints
+ for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) {
+ long[] checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId);
+ assertArrayEquals(testOffsets, checkpoint);
+
+ // change the offsets, make sure the snapshot did not change
+ long[] checkpointCopy = Arrays.copyOf(checkpoint, checkpoint.length);
+
+ for (int i = 0; i < testOffsets.length; i++) {
+ testOffsets[i] += 1L;
+ }
+
+ assertArrayEquals(checkpointCopy, checkpoint);
+
+ assertTrue(map.size() > 0);
+ assertTrue(map.size() <= FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ @Ignore("Kafka consumer internally makes an infinite loop")
+ public void testCreateSourceWithoutCluster() {
+ try {
+ Properties props = new Properties();
+ props.setProperty("zookeeper.connect", "localhost:56794");
+ props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
+ props.setProperty("group.id", "non-existent-group");
+
+ new FlinkKafkaConsumer<String>("no op topic", new JavaDefaultStringSchema(), props,
+ FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
+ FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}