You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/01 07:35:56 UTC
[05/14] flink git commit: [FLINK-4022] [kafka] Partition / topic
discovery for FlinkKafkaConsumer
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index cfd7c3b..3bed0b8 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -31,6 +31,8 @@ import org.apache.flink.util.SerializedValue;
import java.io.IOException;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -62,7 +64,18 @@ public abstract class AbstractFetcher<T, KPH> {
protected final Object checkpointLock;
/** All partitions (and their state) that this fetcher is subscribed to. */
- private final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates;
+ private final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates;
+
+ /**
+ * Queue of partitions that are not yet assigned to any Kafka clients for consuming.
+ * Kafka version-specific implementations of {@link AbstractFetcher#runFetchLoop()}
+ * should continuously poll this queue for unassigned partitions, and start consuming
+ * them accordingly.
+ *
+ * <p>All partitions added to this queue are guaranteed to have been added
+ * to {@link #subscribedPartitionStates} already.
+ */
+ protected final ClosableBlockingQueue<KafkaTopicPartitionState<KPH>> unassignedPartitionsQueue;
/** The mode describing whether the fetcher also generates timestamps and watermarks. */
protected final int timestampWatermarkMode;
@@ -70,6 +83,23 @@ public abstract class AbstractFetcher<T, KPH> {
/** Flag whether to register metrics for the fetcher. */
protected final boolean useMetrics;
+ /**
+ * Optional timestamp extractor / watermark generator that will be run per Kafka partition,
+ * to exploit per-partition timestamp characteristics.
+ * The assigner is kept in serialized form, to deserialize it into multiple copies.
+ */
+ private final SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic;
+
+ /**
+ * Optional timestamp extractor / watermark generator that will be run per Kafka partition,
+ * to exploit per-partition timestamp characteristics.
+ * The assigner is kept in serialized form, to deserialize it into multiple copies.
+ */
+ private final SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated;
+
+ /** User class loader used to deserialize watermark assigners. */
+ private final ClassLoader userCodeClassLoader;
+
/** Only relevant for punctuated watermarks: The current cross partition watermark. */
private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
@@ -77,7 +107,7 @@ public abstract class AbstractFetcher<T, KPH> {
protected AbstractFetcher(
SourceContext<T> sourceContext,
- Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+ Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
@@ -87,8 +117,11 @@ public abstract class AbstractFetcher<T, KPH> {
this.sourceContext = checkNotNull(sourceContext);
this.checkpointLock = sourceContext.getCheckpointLock();
this.useMetrics = useMetrics;
+ this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
// figure out what we watermark mode we will be using
+ this.watermarksPeriodic = watermarksPeriodic;
+ this.watermarksPunctuated = watermarksPunctuated;
if (watermarksPeriodic == null) {
if (watermarksPunctuated == null) {
@@ -105,31 +138,69 @@ public abstract class AbstractFetcher<T, KPH> {
}
}
- // create our partition state according to the timestamp/watermark mode
- this.subscribedPartitionStates = initializeSubscribedPartitionStates(
- assignedPartitionsWithInitialOffsets,
+ this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
+
+ // initialize subscribed partition states with seed partitions
+ this.subscribedPartitionStates = createPartitionStateHolders(
+ seedPartitionsWithInitialOffsets,
timestampWatermarkMode,
- watermarksPeriodic, watermarksPunctuated,
+ watermarksPeriodic,
+ watermarksPunctuated,
userCodeClassLoader);
- // check that all partition states have a defined offset
+ // check that all seed partition states have a defined offset
for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) {
if (!partitionState.isOffsetDefined()) {
- throw new IllegalArgumentException("The fetcher was assigned partitions with undefined initial offsets.");
+ throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
}
}
+ // all seed partitions are not assigned yet, so should be added to the unassigned partitions queue
+ for (KafkaTopicPartitionState<KPH> partition : subscribedPartitionStates) {
+ unassignedPartitionsQueue.add(partition);
+ }
+
// if we have periodic watermarks, kick off the interval scheduler
if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
- KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts =
- (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) subscribedPartitionStates;
+ @SuppressWarnings("unchecked")
+ PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter(
+ subscribedPartitionStates,
+ sourceContext,
+ processingTimeProvider,
+ autoWatermarkInterval);
- PeriodicWatermarkEmitter periodicEmitter =
- new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
periodicEmitter.start();
}
}
+ /**
+ * Adds a list of newly discovered partitions to the fetcher for consuming.
+ *
+ * <p>This method creates the partition state holder for each new partition, using
+ * {@link KafkaTopicPartitionStateSentinel#EARLIEST_OFFSET} as the starting offset.
+ * It uses the earliest offset because there may be delay in discovering a partition
+ * after it was created and started receiving records.
+ *
+ * <p>After the state representation for a partition is created, it is added to the
+ * unassigned partitions queue to await to be consumed.
+ *
+ * @param newPartitions discovered partitions to add
+ */
+ public void addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions) throws IOException, ClassNotFoundException {
+ List<KafkaTopicPartitionState<KPH>> newPartitionStates = createPartitionStateHolders(
+ newPartitions,
+ KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET,
+ timestampWatermarkMode,
+ watermarksPeriodic,
+ watermarksPunctuated,
+ userCodeClassLoader);
+
+ for (KafkaTopicPartitionState<KPH> newPartitionState : newPartitionStates) {
+ subscribedPartitionStates.add(newPartitionState);
+ unassignedPartitionsQueue.add(newPartitionState);
+ }
+ }
+
// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
@@ -139,7 +210,7 @@ public abstract class AbstractFetcher<T, KPH> {
*
* @return All subscribed partitions.
*/
- protected final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates() {
+ protected final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates() {
return subscribedPartitionStates;
}
@@ -156,15 +227,6 @@ public abstract class AbstractFetcher<T, KPH> {
// ------------------------------------------------------------------------
/**
- * Creates the Kafka version specific representation of the given
- * topic partition.
- *
- * @param partition The Flink representation of the Kafka topic partition.
- * @return The specific Kafka representation of the Kafka topic partition.
- */
- public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition);
-
- /**
* Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
* older Kafka versions). This method is only ever called when the offset commit mode of
* the consumer is {@link OffsetCommitMode#ON_CHECKPOINTS}.
@@ -179,6 +241,15 @@ public abstract class AbstractFetcher<T, KPH> {
*/
public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
+ /**
+ * Creates the Kafka version specific representation of the given
+ * topic partition.
+ *
+ * @param partition The Flink representation of the Kafka topic partition.
+ * @return The version-specific Kafka representation of the Kafka topic partition.
+ */
+ protected abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition);
+
// ------------------------------------------------------------------------
// snapshot and restore the state
// ------------------------------------------------------------------------
@@ -186,7 +257,7 @@ public abstract class AbstractFetcher<T, KPH> {
/**
* Takes a snapshot of the partition offsets.
*
- * <p>Important: This method mus be called under the checkpoint lock.
+ * <p>Important: This method must be called under the checkpoint lock.
*
* @return A map from partition to current offset.
*/
@@ -194,8 +265,8 @@ public abstract class AbstractFetcher<T, KPH> {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
- HashMap<KafkaTopicPartition, Long> state = new HashMap<>(subscribedPartitionStates.length);
- for (KafkaTopicPartitionState<?> partition : subscribedPartitionStates()) {
+ HashMap<KafkaTopicPartition, Long> state = new HashMap<>(subscribedPartitionStates.size());
+ for (KafkaTopicPartitionState<KPH> partition : subscribedPartitionStates) {
state.put(partition.getKafkaTopicPartition(), partition.getOffset());
}
return state;
@@ -365,78 +436,73 @@ public abstract class AbstractFetcher<T, KPH> {
/**
* Utility method that takes the topic partitions and creates the topic partition state
- * holders. If a watermark generator per partition exists, this will also initialize those.
+ * holders, depending on the timestamp / watermark mode.
*/
- private KafkaTopicPartitionState<KPH>[] initializeSubscribedPartitionStates(
- Map<KafkaTopicPartition, Long> assignedPartitionsToInitialOffsets,
+ private List<KafkaTopicPartitionState<KPH>> createPartitionStateHolders(
+ Map<KafkaTopicPartition, Long> partitionsToInitialOffsets,
int timestampWatermarkMode,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
- switch (timestampWatermarkMode) {
- case NO_TIMESTAMPS_WATERMARKS: {
- @SuppressWarnings("unchecked")
- KafkaTopicPartitionState<KPH>[] partitions =
- (KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitionsToInitialOffsets.size()];
+ List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>();
- int pos = 0;
- for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
+ switch (timestampWatermarkMode) {
+ case NO_TIMESTAMPS_WATERMARKS: {
+ for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
// create the kafka version specific partition handle
- KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey());
- partitions[pos] = new KafkaTopicPartitionState<>(partition.getKey(), kafkaHandle);
- partitions[pos].setOffset(partition.getValue());
+ KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
+
+ KafkaTopicPartitionState<KPH> partitionState =
+ new KafkaTopicPartitionState<>(partitionEntry.getKey(), kafkaHandle);
+ partitionState.setOffset(partitionEntry.getValue());
- pos++;
+ partitionStates.add(partitionState);
}
- return partitions;
+ return partitionStates;
}
case PERIODIC_WATERMARKS: {
- @SuppressWarnings("unchecked")
- KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
- (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
- new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitionsToInitialOffsets.size()];
-
- int pos = 0;
- for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
- KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey());
+ for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
+ KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
AssignerWithPeriodicWatermarks<T> assignerInstance =
watermarksPeriodic.deserializeValue(userCodeClassLoader);
- partitions[pos] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
- partition.getKey(), kafkaHandle, assignerInstance);
- partitions[pos].setOffset(partition.getValue());
+ KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partitionState =
+ new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+ partitionEntry.getKey(),
+ kafkaHandle,
+ assignerInstance);
- pos++;
+ partitionState.setOffset(partitionEntry.getValue());
+
+ partitionStates.add(partitionState);
}
- return partitions;
+ return partitionStates;
}
case PUNCTUATED_WATERMARKS: {
- @SuppressWarnings("unchecked")
- KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
- (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
- new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitionsToInitialOffsets.size()];
-
- int pos = 0;
- for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
- KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey());
+ for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
+ KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
AssignerWithPunctuatedWatermarks<T> assignerInstance =
watermarksPunctuated.deserializeValue(userCodeClassLoader);
- partitions[pos] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
- partition.getKey(), kafkaHandle, assignerInstance);
- partitions[pos].setOffset(partition.getValue());
+ KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> partitionState =
+ new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+ partitionEntry.getKey(),
+ kafkaHandle,
+ assignerInstance);
+
+ partitionState.setOffset(partitionEntry.getValue());
- pos++;
+ partitionStates.add(partitionState);
}
- return partitions;
+ return partitionStates;
}
default:
// cannot happen, add this as a guard for the future
@@ -444,6 +510,31 @@ public abstract class AbstractFetcher<T, KPH> {
}
}
+ /**
+ * Shortcut variant of {@link #createPartitionStateHolders(Map, int, SerializedValue, SerializedValue, ClassLoader)}
+ * that uses the same offset for all partitions when creating their state holders.
+ */
+ private List<KafkaTopicPartitionState<KPH>> createPartitionStateHolders(
+ List<KafkaTopicPartition> partitions,
+ long initialOffset,
+ int timestampWatermarkMode,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+ ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
+
+ Map<KafkaTopicPartition, Long> partitionsToInitialOffset = new HashMap<>(partitions.size());
+ for (KafkaTopicPartition partition : partitions) {
+ partitionsToInitialOffset.put(partition, initialOffset);
+ }
+
+ return createPartitionStateHolders(
+ partitionsToInitialOffset,
+ timestampWatermarkMode,
+ watermarksPeriodic,
+ watermarksPunctuated,
+ userCodeClassLoader);
+ }
+
// ------------------------- Metrics ----------------------------------
/**
@@ -455,7 +546,7 @@ public abstract class AbstractFetcher<T, KPH> {
// add current offsets to gage
MetricGroup currentOffsets = metricGroup.addGroup("current-offsets");
MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets");
- for (KafkaTopicPartitionState<?> ktp: subscribedPartitionStates()) {
+ for (KafkaTopicPartitionState<KPH> ktp : subscribedPartitionStates) {
currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
}
@@ -500,9 +591,9 @@ public abstract class AbstractFetcher<T, KPH> {
* The periodic watermark emitter. In its given interval, it checks all partitions for
* the current event time watermark, and possibly emits the next watermark.
*/
- private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback {
+ private static class PeriodicWatermarkEmitter<KPH> implements ProcessingTimeCallback {
- private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
+ private final List<KafkaTopicPartitionState<KPH>> allPartitions;
private final SourceContext<?> emitter;
@@ -515,7 +606,7 @@ public abstract class AbstractFetcher<T, KPH> {
//-------------------------------------------------
PeriodicWatermarkEmitter(
- KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
+ List<KafkaTopicPartitionState<KPH>> allPartitions,
SourceContext<?> emitter,
ProcessingTimeService timerService,
long autoWatermarkInterval) {
@@ -536,14 +627,14 @@ public abstract class AbstractFetcher<T, KPH> {
public void onProcessingTime(long timestamp) throws Exception {
long minAcrossAll = Long.MAX_VALUE;
- for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) {
+ for (KafkaTopicPartitionState<?> state : allPartitions) {
// we access the current watermark for the periodic assigners under the state
// lock, to prevent concurrent modification to any internal variables
final long curr;
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (state) {
- curr = state.getCurrentWatermarkTimestamp();
+ curr = ((KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>) state).getCurrentWatermarkTimestamp();
}
minAcrossAll = Math.min(minAcrossAll, curr);
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
new file mode 100644
index 0000000..d55099a
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all partition discoverers.
+ *
+ * <p>This partition discoverer base class implements the logic around bookkeeping
+ * discovered partitions, and using the information to determine whether or not there
+ * are new partitions that the consumer subtask should subscribe to.
+ *
+ * <p>Subclass implementations should simply implement the logic of using the version-specific
+ * Kafka clients to fetch topic and partition metadata.
+ *
+ * <p>Since Kafka clients are generally not thread-safe, partition discoverers should
+ * not be concurrently accessed. The only exception for this would be the {@link #wakeup()}
+ * call, which allows the discoverer to be interrupted during a {@link #discoverPartitions()} call.
+ */
+public abstract class AbstractPartitionDiscoverer {
+
+ /** Describes whether we are discovering partitions for fixed topics or a topic pattern. */
+ private final KafkaTopicsDescriptor topicsDescriptor;
+
+ /** Index of the consumer subtask that this partition discoverer belongs to. */
+ private final int indexOfThisSubtask;
+
+ /** The total number of consumer subtasks. */
+ private final int numParallelSubtasks;
+
+ /** Flag to determine whether or not the discoverer is closed. */
+ private volatile boolean closed = true;
+
+ /**
+ * Flag to determine whether or not the discoverer had been woken up.
+ * When set to {@code true}, {@link #discoverPartitions()} would be interrupted as early as possible.
+ * Once interrupted, the flag is reset.
+ */
+ private volatile boolean wakeup;
+
+ /**
+ * Map of topics to they're largest discovered partition id seen by this subtask.
+ * This state may be updated whenever {@link AbstractPartitionDiscoverer#discoverPartitions()} or
+ * {@link AbstractPartitionDiscoverer#setAndCheckDiscoveredPartition(KafkaTopicPartition)} is called.
+ *
+ * <p>This is used to remove old partitions from the fetched partition lists. It is sufficient
+ * to keep track of only the largest partition id because Kafka partition numbers are only
+ * allowed to be increased and has incremental ids.
+ */
+ private Map<String, Integer> topicsToLargestDiscoveredPartitionId;
+
+ public AbstractPartitionDiscoverer(
+ KafkaTopicsDescriptor topicsDescriptor,
+ int indexOfThisSubtask,
+ int numParallelSubtasks) {
+
+ this.topicsDescriptor = checkNotNull(topicsDescriptor);
+ this.indexOfThisSubtask = indexOfThisSubtask;
+ this.numParallelSubtasks = numParallelSubtasks;
+ this.topicsToLargestDiscoveredPartitionId = new HashMap<>();
+ }
+
+ /**
+ * Opens the partition discoverer, initializing all required Kafka connections.
+ *
+ * <p>NOTE: thread-safety is not guaranteed.
+ */
+ public void open() throws Exception {
+ closed = false;
+ initializeConnections();
+ }
+
+ /**
+ * Closes the partition discoverer, cleaning up all Kafka connections.
+ *
+ * <p>NOTE: thread-safety is not guaranteed.
+ */
+ public void close() throws Exception {
+ closed = true;
+ closeConnections();
+ }
+
+ /**
+ * Interrupt an in-progress discovery attempt by throwing a {@link WakeupException}.
+ * If no attempt is in progress, the immediate next attempt will throw a {@link WakeupException}.
+ *
+ * <p>This method can be called concurrently from a different thread.
+ */
+ public void wakeup() {
+ wakeup = true;
+ wakeupConnections();
+ }
+
+ /**
+ * Execute a partition discovery attempt for this subtask.
+ * This method lets the partition discoverer update what partitions it has discovered so far.
+ *
+ * @return List of discovered new partitions that this subtask should subscribe to.
+ */
+ public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
+ if (!closed && !wakeup) {
+ try {
+ List<KafkaTopicPartition> newDiscoveredPartitions;
+
+ // (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic pattern
+ if (topicsDescriptor.isFixedTopics()) {
+ newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
+ } else {
+ List<String> matchedTopics = getAllTopics();
+
+ // retain topics that match the pattern
+ Iterator<String> iter = matchedTopics.iterator();
+ while (iter.hasNext()) {
+ if (!topicsDescriptor.getTopicPattern().matcher(iter.next()).matches()) {
+ iter.remove();
+ }
+ }
+
+ if (matchedTopics.size() != 0) {
+ // get partitions only for matched topics
+ newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
+ } else {
+ newDiscoveredPartitions = null;
+ }
+ }
+
+ // (2) eliminate partition that are old partitions or should not be subscribed by this subtask
+ if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
+ throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
+ } else {
+ // sort so that we make sure the topicsToLargestDiscoveredPartitionId state is updated
+ // with incremental partition ids of the same topics (otherwise some partition ids may be skipped)
+ KafkaTopicPartition.sort(newDiscoveredPartitions);
+
+ Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
+ KafkaTopicPartition nextPartition;
+ while (iter.hasNext()) {
+ nextPartition = iter.next();
+ if (!setAndCheckDiscoveredPartition(nextPartition)) {
+ iter.remove();
+ }
+ }
+ }
+
+ return newDiscoveredPartitions;
+ } catch (WakeupException e) {
+ // the actual topic / partition metadata fetching methods
+ // may be woken up midway; reset the wakeup flag and rethrow
+ wakeup = false;
+ throw e;
+ }
+ } else if (!closed && wakeup) {
+ // may have been woken up before the method call
+ wakeup = false;
+ throw new WakeupException();
+ } else {
+ throw new ClosedException();
+ }
+ }
+
+ /**
+ * Sets a partition as discovered. Partitions are considered as new
+ * if its partition id is larger than all partition ids previously
+ * seen for the topic it belongs to. Therefore, for a set of
+ * discovered partitions, the order that this method is invoked with
+ * each partition is important.
+ *
+ * <p>If the partition is indeed newly discovered, this method also returns
+ * whether the new partition should be subscribed by this subtask.
+ *
+ * @param partition the partition to set and check
+ *
+ * @return {@code true}, if the partition wasn't seen before and should
+ * be subscribed by this subtask; {@code false} otherwise
+ */
+ public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
+ if (isUndiscoveredPartition(partition)) {
+ topicsToLargestDiscoveredPartitionId.put(partition.getTopic(), partition.getPartition());
+
+ return shouldAssignToThisSubtask(partition, indexOfThisSubtask, numParallelSubtasks);
+ }
+
+ return false;
+ }
+
+ // ------------------------------------------------------------------------
+ // Kafka version specifics
+ // ------------------------------------------------------------------------
+
+ /** Establish the required connections in order to fetch topics and partitions metadata. */
+ protected abstract void initializeConnections() throws Exception;
+
+ /**
+ * Attempt to eagerly wakeup from blocking calls to Kafka in {@link AbstractPartitionDiscoverer#getAllTopics()}
+ * and {@link AbstractPartitionDiscoverer#getAllPartitionsForTopics(List)}.
+ *
+ * <p>If the invocation indeed results in interrupting an actual blocking Kafka call, the implementations
+ * of {@link AbstractPartitionDiscoverer#getAllTopics()} and
+ * {@link AbstractPartitionDiscoverer#getAllPartitionsForTopics(List)} are responsible of throwing a
+ * {@link WakeupException}.
+ */
+ protected abstract void wakeupConnections();
+
+ /** Close all established connections. */
+ protected abstract void closeConnections() throws Exception;
+
+ /** Fetch the list of all topics from Kafka. */
+ protected abstract List<String> getAllTopics() throws WakeupException;
+
+ /** Fetch the list of all partitions for a specific topics list from Kafka. */
+ protected abstract List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws WakeupException;
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ /** Signaling exception to indicate that an actual Kafka call was interrupted. */
+ public static final class WakeupException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+
+ /** Thrown if this discoverer was used to discover partitions after it was closed. */
+ public static final class ClosedException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+
+ private boolean isUndiscoveredPartition(KafkaTopicPartition partition) {
+ return !topicsToLargestDiscoveredPartitionId.containsKey(partition.getTopic())
+ || partition.getPartition() > topicsToLargestDiscoveredPartitionId.get(partition.getTopic());
+ }
+
+ public static boolean shouldAssignToThisSubtask(KafkaTopicPartition partition, int indexOfThisSubtask, int numParallelSubtasks) {
+ return Math.abs(partition.hashCode() % numParallelSubtasks) == indexOfThisSubtask;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
new file mode 100644
index 0000000..da61dd0
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A special form of blocking queue with two additions:
+ * <ol>
+ * <li>The queue can be closed atomically when empty. Adding elements after the queue
+ * is closed fails. This allows queue consumers to atomically discover that no elements
+ * are available and mark themselves as shut down.</li>
+ * <li>The queue allows to poll batches of elements in one polling call.</li>
+ * </ol>
+ *
+ * <p>The queue has no capacity restriction and is safe for multiple producers and consumers.
+ *
+ * <p>Note: Null elements are prohibited.
+ *
+ * @param <E> The type of elements in the queue.
+ */
+public class ClosableBlockingQueue<E> {
+
+ /** The lock used to make queue accesses and open checks atomic. */
+ private final ReentrantLock lock;
+
+ /** The condition on which blocking get-calls wait if the queue is empty. */
+ private final Condition nonEmpty;
+
+ /** The deque of elements. */
+ private final ArrayDeque<E> elements;
+
+ /** Flag marking the status of the queue. */
+ private volatile boolean open;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new empty queue.
+ */
+ public ClosableBlockingQueue() {
+ this(10);
+ }
+
+ /**
+ * Creates a new empty queue, reserving space for at least the specified number
+ * of elements. The queu can still grow, of more elements are added than the
+ * reserved space.
+ *
+ * @param initialSize The number of elements to reserve space for.
+ */
+ public ClosableBlockingQueue(int initialSize) {
+ this.lock = new ReentrantLock(true);
+ this.nonEmpty = this.lock.newCondition();
+
+ this.elements = new ArrayDeque<>(initialSize);
+ this.open = true;
+
+ }
+
+ /**
+ * Creates a new queue that contains the given elements.
+ *
+ * @param initialElements The elements to initially add to the queue.
+ */
+ public ClosableBlockingQueue(Collection<? extends E> initialElements) {
+ this(initialElements.size());
+ this.elements.addAll(initialElements);
+ }
+
+ // ------------------------------------------------------------------------
+ // Size and status
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the number of elements currently in the queue.
+ * @return The number of elements currently in the queue.
+ */
+ public int size() {
+ lock.lock();
+ try {
+ return elements.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Checks whether the queue is empty (has no elements).
+ * @return True, if the queue is empty; false, if it is non-empty.
+ */
+ public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ /**
+ * Checks whether the queue is currently open, meaning elements can be added and polled.
+ * @return True, if the queue is open; false, if it is closed.
+ */
+ public boolean isOpen() {
+ return open;
+ }
+
+ /**
+ * Tries to close the queue. Closing the queue only succeeds when no elements are
+ * in the queue when this method is called. Checking whether the queue is empty, and
+ * marking the queue as closed is one atomic operation.
+ *
+ * @return True, if the queue is closed, false if the queue remains open.
+ */
+ public boolean close() {
+ lock.lock();
+ try {
+ if (open) {
+ if (elements.isEmpty()) {
+ open = false;
+ nonEmpty.signalAll();
+ return true;
+ } else {
+ return false;
+ }
+ }
+ else {
+ // already closed
+ return true;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Adding / Removing elements
+ // ------------------------------------------------------------------------
+
+ /**
+ * Tries to add an element to the queue, if the queue is still open. Checking whether the queue
+ * is open and adding the element is one atomic operation.
+ *
+ * <p>Unlike the {@link #add(Object)} method, this method never throws an exception,
+ * but only indicates via the return code if the element was added or the
+ * queue was closed.
+ *
+ * @param element The element to add.
+ * @return True, if the element was added, false if the queue was closes.
+ */
+ public boolean addIfOpen(E element) {
+ requireNonNull(element);
+
+ lock.lock();
+ try {
+ if (open) {
+ elements.addLast(element);
+ if (elements.size() == 1) {
+ nonEmpty.signalAll();
+ }
+ }
+ return open;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Adds the element to the queue, or fails with an exception, if the queue is closed.
+ * Checking whether the queue is open and adding the element is one atomic operation.
+ *
+ * @param element The element to add.
+ * @throws IllegalStateException Thrown, if the queue is closed.
+ */
+ public void add(E element) throws IllegalStateException {
+ requireNonNull(element);
+
+ lock.lock();
+ try {
+ if (open) {
+ elements.addLast(element);
+ if (elements.size() == 1) {
+ nonEmpty.signalAll();
+ }
+ } else {
+ throw new IllegalStateException("queue is closed");
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the queue's next element without removing it, if the queue is non-empty.
+ * Otherwise, returns null.
+ *
+ * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+ * Checking whether the queue is open and getting the next element is one atomic operation.
+ *
+ * <p>This method never blocks.
+ *
+ * @return The queue's next element, or null, if the queue is empty.
+ * @throws IllegalStateException Thrown, if the queue is closed.
+ */
+ public E peek() {
+ lock.lock();
+ try {
+ if (open) {
+ if (elements.size() > 0) {
+ return elements.getFirst();
+ } else {
+ return null;
+ }
+ } else {
+ throw new IllegalStateException("queue is closed");
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the queue's next element and removes it, the queue is non-empty.
+ * Otherwise, this method returns null.
+ *
+ * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+ * Checking whether the queue is open and removing the next element is one atomic operation.
+ *
+ * <p>This method never blocks.
+ *
+ * @return The queue's next element, or null, if the queue is empty.
+ * @throws IllegalStateException Thrown, if the queue is closed.
+ */
+ public E poll() {
+ lock.lock();
+ try {
+ if (open) {
+ if (elements.size() > 0) {
+ return elements.removeFirst();
+ } else {
+ return null;
+ }
+ } else {
+ throw new IllegalStateException("queue is closed");
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns all of the queue's current elements in a list, if the queue is non-empty.
+ * Otherwise, this method returns null.
+ *
+ * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+ * Checking whether the queue is open and removing the elements is one atomic operation.
+ *
+ * <p>This method never blocks.
+ *
+ * @return All of the queue's elements, or null, if the queue is empty.
+ * @throws IllegalStateException Thrown, if the queue is closed.
+ */
+ public List<E> pollBatch() {
+ lock.lock();
+ try {
+ if (open) {
+ if (elements.size() > 0) {
+ ArrayList<E> result = new ArrayList<>(elements);
+ elements.clear();
+ return result;
+ } else {
+ return null;
+ }
+ } else {
+ throw new IllegalStateException("queue is closed");
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the next element in the queue. If the queue is empty, this method
+ * waits until at least one element is added.
+ *
+ * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+ * Checking whether the queue is open and removing the next element is one atomic operation.
+ *
+ * @return The next element in the queue, never null.
+ *
+ * @throws IllegalStateException Thrown, if the queue is closed.
+ * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+ * element to be added.
+ */
+ public E getElementBlocking() throws InterruptedException {
+ lock.lock();
+ try {
+ while (open && elements.isEmpty()) {
+ nonEmpty.await();
+ }
+
+ if (open) {
+ return elements.removeFirst();
+ } else {
+ throw new IllegalStateException("queue is closed");
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the next element in the queue. If the queue is empty, this method
+ * waits at most a certain time until an element becomes available. If no element
+ * is available after that time, the method returns null.
+ *
+ * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+ * Checking whether the queue is open and removing the next element is one atomic operation.
+ *
+ * @param timeoutMillis The number of milliseconds to block, at most.
+ * @return The next element in the queue, or null, if the timeout expires before an element is available.
+ *
+ * @throws IllegalStateException Thrown, if the queue is closed.
+ * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+ * element to be added.
+ */
+ public E getElementBlocking(long timeoutMillis) throws InterruptedException {
+ if (timeoutMillis == 0L) {
+ // wait forever case
+ return getElementBlocking();
+ } else if (timeoutMillis < 0L) {
+ throw new IllegalArgumentException("invalid timeout");
+ }
+
+ final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
+
+ lock.lock();
+ try {
+ while (open && elements.isEmpty() && timeoutMillis > 0) {
+ nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
+ timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
+ }
+
+ if (!open) {
+ throw new IllegalStateException("queue is closed");
+ }
+ else if (elements.isEmpty()) {
+ return null;
+ } else {
+ return elements.removeFirst();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Gets all the elements found in the list, or blocks until at least one element
+ * was added. If the queue is empty when this method is called, it blocks until
+ * at least one element is added.
+ *
+ * <p>This method always returns a list with at least one element.
+ *
+ * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+ * Checking whether the queue is open and removing the next element is one atomic operation.
+ *
+ * @return A list with all elements in the queue, always at least one element.
+ *
+ * @throws IllegalStateException Thrown, if the queue is closed.
+ * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+ * element to be added.
+ */
+ public List<E> getBatchBlocking() throws InterruptedException {
+ lock.lock();
+ try {
+ while (open && elements.isEmpty()) {
+ nonEmpty.await();
+ }
+ if (open) {
+ ArrayList<E> result = new ArrayList<>(elements);
+ elements.clear();
+ return result;
+ } else {
+ throw new IllegalStateException("queue is closed");
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Gets all the elements found in the list, or blocks until at least one element
+ * was added. This method is similar as {@link #getBatchBlocking()}, but takes
+ * a number of milliseconds that the method will maximally wait before returning.
+ *
+ * <p>This method never returns null, but an empty list, if the queue is empty when
+ * the method is called and the request times out before an element was added.
+ *
+ * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+ * Checking whether the queue is open and removing the next element is one atomic operation.
+ *
+ * @param timeoutMillis The number of milliseconds to wait, at most.
+ * @return A list with all elements in the queue, possible an empty list.
+ *
+ * @throws IllegalStateException Thrown, if the queue is closed.
+ * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+ * element to be added.
+ */
+ public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException {
+ if (timeoutMillis == 0L) {
+ // wait forever case
+ return getBatchBlocking();
+ } else if (timeoutMillis < 0L) {
+ throw new IllegalArgumentException("invalid timeout");
+ }
+
+ final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
+
+ lock.lock();
+ try {
+ while (open && elements.isEmpty() && timeoutMillis > 0) {
+ nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
+ timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
+ }
+
+ if (!open) {
+ throw new IllegalStateException("queue is closed");
+ }
+ else if (elements.isEmpty()) {
+ return Collections.emptyList();
+ }
+ else {
+ ArrayList<E> result = new ArrayList<>(elements);
+ elements.clear();
+ return result;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Standard Utilities
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ int hashCode = 17;
+ for (E element : elements) {
+ hashCode = 31 * hashCode + element.hashCode();
+ }
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (obj != null && obj.getClass() == ClosableBlockingQueue.class) {
+ @SuppressWarnings("unchecked")
+ ClosableBlockingQueue<E> that = (ClosableBlockingQueue<E>) obj;
+
+ if (this.elements.size() == that.elements.size()) {
+ Iterator<E> thisElements = this.elements.iterator();
+ for (E thatNext : that.elements) {
+ E thisNext = thisElements.next();
+ if (!(thisNext == null ? thatNext == null : thisNext.equals(thatNext))) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return elements.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
index f3645e3..3500cd8 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka.internals;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -116,4 +117,22 @@ public final class KafkaTopicPartition implements Serializable {
}
return ret;
}
+
+ /**
+ * A {@link java.util.Comparator} for {@link KafkaTopicPartition}s.
+ */
+ public static class Comparator implements java.util.Comparator<KafkaTopicPartition> {
+ @Override
+ public int compare(KafkaTopicPartition p1, KafkaTopicPartition p2) {
+ if (!p1.getTopic().equals(p2.getTopic())) {
+ return p1.getTopic().compareTo(p2.getTopic());
+ } else {
+ return Integer.compare(p1.getPartition(), p2.getPartition());
+ }
+ }
+ }
+
+ public static void sort(List<KafkaTopicPartition> partitions) {
+ Collections.sort(partitions, new Comparator());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
new file mode 100644
index 0000000..9a81ea8
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A Kafka Topics Descriptor describes how the consumer subscribes to Kafka topics -
+ * either a fixed list of topics, or a topic pattern.
+ */
+public class KafkaTopicsDescriptor implements Serializable {
+
+ private static final long serialVersionUID = -3807227764764900975L;
+
+ private final List<String> fixedTopics;
+ private final Pattern topicPattern;
+
+ public KafkaTopicsDescriptor(@Nullable List<String> fixedTopics, @Nullable Pattern topicPattern) {
+ checkArgument((fixedTopics != null && topicPattern == null) || (fixedTopics == null && topicPattern != null),
+ "Exactly one of either fixedTopics or topicPattern must be specified.");
+
+ if (fixedTopics != null) {
+ checkArgument(!fixedTopics.isEmpty(), "If subscribing to a fixed topics list, the supplied list cannot be empty.");
+ }
+
+ this.fixedTopics = fixedTopics;
+ this.topicPattern = topicPattern;
+ }
+
+ public boolean isFixedTopics() {
+ return fixedTopics != null;
+ }
+
+ public boolean isTopicPattern() {
+ return topicPattern != null;
+ }
+
+ public List<String> getFixedTopics() {
+ return fixedTopics;
+ }
+
+ public Pattern getTopicPattern() {
+ return topicPattern;
+ }
+
+ @Override
+ public String toString() {
+ return (fixedTopics == null)
+ ? "Topic Regex Pattern (" + topicPattern.pattern() + ")"
+ : "Fixed Topics (" + fixedTopics + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 692dc1d..e3f337e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -28,8 +28,10 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
@@ -54,10 +56,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doAnswer;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.when;
/**
* Tests for checking whether {@link FlinkKafkaConsumerBase} can restore from snapshots that were
@@ -122,7 +128,8 @@ public class FlinkKafkaConsumerBaseMigrationTest {
final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet());
- final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(fetcher, partitions);
+ final DummyFlinkKafkaConsumer<String> consumerFunction =
+ new DummyFlinkKafkaConsumer<>(fetcher, partitions, FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
new StreamSource<>(consumerFunction);
@@ -178,7 +185,9 @@ public class FlinkKafkaConsumerBaseMigrationTest {
@Test
public void testRestoreFromEmptyStateNoPartitions() throws Exception {
final DummyFlinkKafkaConsumer<String> consumerFunction =
- new DummyFlinkKafkaConsumer<>(Collections.<KafkaTopicPartition>emptyList());
+ new DummyFlinkKafkaConsumer<>(
+ Collections.<KafkaTopicPartition>emptyList(),
+ FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction);
@@ -203,7 +212,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
// assert that no state was restored
- assertTrue(consumerFunction.getRestoredState() == null);
+ assertTrue(consumerFunction.getRestoredState().isEmpty());
consumerOperator.close();
consumerOperator.cancel();
@@ -217,7 +226,8 @@ public class FlinkKafkaConsumerBaseMigrationTest {
public void testRestoreFromEmptyStateWithPartitions() throws Exception {
final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet());
- final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(partitions);
+ final DummyFlinkKafkaConsumer<String> consumerFunction =
+ new DummyFlinkKafkaConsumer<>(partitions, FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
new StreamSource<>(consumerFunction);
@@ -238,19 +248,25 @@ public class FlinkKafkaConsumerBaseMigrationTest {
testHarness.open();
- // the expected state in "kafka-consumer-migration-test-flink*-empty-state-snapshot";
- // since the state is empty, the consumer should reflect on the startup mode to determine start offsets.
+ // the expected state in "kafka-consumer-migration-test-flink1.2-snapshot-empty-state";
+ // all new partitions after the snapshot are considered as partitions that were created while the
+ // consumer wasn't running, and should start from the earliest offset.
final HashMap<KafkaTopicPartition, Long> expectedSubscribedPartitionsWithStartOffsets = new HashMap<>();
for (KafkaTopicPartition partition : PARTITION_STATE.keySet()) {
- expectedSubscribedPartitionsWithStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+ expectedSubscribedPartitionsWithStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
}
// assert that there are partitions and is identical to expected list
assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
- Assert.assertEquals(expectedSubscribedPartitionsWithStartOffsets, consumerFunction.getSubscribedPartitionsToStartOffsets());
+ assertEquals(expectedSubscribedPartitionsWithStartOffsets, consumerFunction.getSubscribedPartitionsToStartOffsets());
- assertTrue(consumerFunction.getRestoredState() == null);
+ // the new partitions should have been considered as restored state
+ assertTrue(consumerFunction.getRestoredState() != null);
+ assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+ for (Map.Entry<KafkaTopicPartition, Long> expectedEntry : expectedSubscribedPartitionsWithStartOffsets.entrySet()) {
+ assertEquals(expectedEntry.getValue(), consumerFunction.getRestoredState().get(expectedEntry.getKey()));
+ }
consumerOperator.close();
consumerOperator.cancel();
@@ -264,7 +280,8 @@ public class FlinkKafkaConsumerBaseMigrationTest {
public void testRestore() throws Exception {
final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet());
- final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(partitions);
+ final DummyFlinkKafkaConsumer<String> consumerFunction =
+ new DummyFlinkKafkaConsumer<>(partitions, FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
new StreamSource<>(consumerFunction);
@@ -290,16 +307,56 @@ public class FlinkKafkaConsumerBaseMigrationTest {
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
// on restore, subscribedPartitionsToStartOffsets should be identical to the restored state
- Assert.assertEquals(PARTITION_STATE, consumerFunction.getSubscribedPartitionsToStartOffsets());
+ assertEquals(PARTITION_STATE, consumerFunction.getSubscribedPartitionsToStartOffsets());
// assert that state is correctly restored from legacy checkpoint
assertTrue(consumerFunction.getRestoredState() != null);
- Assert.assertEquals(PARTITION_STATE, consumerFunction.getRestoredState());
+ assertEquals(PARTITION_STATE, consumerFunction.getRestoredState());
consumerOperator.close();
consumerOperator.cancel();
}
+ /**
+ * Test restoring from savepoints before version Flink 1.3 should fail if discovery is enabled.
+ */
+ @Test
+ public void testRestoreFailsWithNonEmptyPreFlink13StatesIfDiscoveryEnabled() throws Exception {
+ assumeTrue(testMigrateVersion == MigrationVersion.v1_1 || testMigrateVersion == MigrationVersion.v1_2);
+
+ final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet());
+
+ final DummyFlinkKafkaConsumer<String> consumerFunction =
+ new DummyFlinkKafkaConsumer<>(partitions, 1000L); // discovery enabled
+
+ StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
+ new StreamSource<>(consumerFunction);
+
+ final AbstractStreamOperatorTestHarness<String> testHarness =
+ new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
+
+ testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ testHarness.setup();
+
+ // restore state from binary snapshot file; should fail since discovery is enabled
+ try {
+ MigrationTestUtil.restoreFromSnapshot(
+ testHarness,
+ OperatorSnapshotUtil.getResourceFilename(
+ "kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"),
+ testMigrateVersion);
+
+ fail("Restore from savepoints from version before Flink 1.3.x should have failed if discovery is enabled.");
+ } catch (Exception e) {
+ if (testMigrateVersion == MigrationVersion.v1_1) {
+ Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
+ } else {
+ Assert.assertTrue(e instanceof IllegalArgumentException);
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
@@ -310,16 +367,23 @@ public class FlinkKafkaConsumerBaseMigrationTest {
private final AbstractFetcher<T, ?> fetcher;
@SuppressWarnings("unchecked")
- DummyFlinkKafkaConsumer(AbstractFetcher<T, ?> fetcher, List<KafkaTopicPartition> partitions) {
- super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class));
+ DummyFlinkKafkaConsumer(
+ AbstractFetcher<T, ?> fetcher,
+ List<KafkaTopicPartition> partitions,
+ long discoveryInterval) {
+
+ super(
+ Arrays.asList("dummy-topic"),
+ null,
+ (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class),
+ discoveryInterval);
+
this.fetcher = fetcher;
this.partitions = partitions;
}
- DummyFlinkKafkaConsumer(List<KafkaTopicPartition> partitions) {
- super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class));
- this.fetcher = mock(AbstractFetcher.class);
- this.partitions = partitions;
+ DummyFlinkKafkaConsumer(List<KafkaTopicPartition> partitions, long discoveryInterval) {
+ this(mock(AbstractFetcher.class), partitions, discoveryInterval);
}
@Override
@@ -334,8 +398,21 @@ public class FlinkKafkaConsumerBaseMigrationTest {
}
@Override
- protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
- return partitions;
+ protected AbstractPartitionDiscoverer createPartitionDiscoverer(
+ KafkaTopicsDescriptor topicsDescriptor,
+ int indexOfThisSubtask,
+ int numParallelSubtasks) {
+
+ AbstractPartitionDiscoverer mockPartitionDiscoverer = mock(AbstractPartitionDiscoverer.class);
+
+ try {
+ when(mockPartitionDiscoverer.discoverPartitions()).thenReturn(partitions);
+ } catch (Exception e) {
+ // ignore
+ }
+ when(mockPartitionDiscoverer.setAndCheckDiscoveredPartition(any(KafkaTopicPartition.class))).thenReturn(true);
+
+ return mockPartitionDiscoverer;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index d673e8e..e0508ce 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -27,15 +27,17 @@ import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.apache.commons.collections.map.LinkedMap;
+
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
@@ -45,7 +47,6 @@ import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -133,8 +134,15 @@ public class FlinkKafkaConsumerBaseTest {
listState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 987654321L));
FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
+ StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+ when(context.getNumberOfParallelSubtasks()).thenReturn(1);
+ when(context.getIndexOfThisSubtask()).thenReturn(0);
+ consumer.setRuntimeContext(context);
- when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+ // mock old 1.2 state (empty)
+ when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(new TestingListState<Serializable>());
+ // mock 1.3 state
+ when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
@@ -176,7 +184,10 @@ public class FlinkKafkaConsumerBaseTest {
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
TestingListState<Serializable> listState = new TestingListState<>();
- when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+ // mock old 1.2 state (empty)
+ when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(new TestingListState<Serializable>());
+ // mock 1.3 state
+ when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
@@ -192,34 +203,8 @@ public class FlinkKafkaConsumerBaseTest {
assertFalse(listState.get().iterator().hasNext());
}
- /**
- * Tests that on snapshots, states and offsets to commit to Kafka are correct.
- */
- @SuppressWarnings("unchecked")
@Test
- public void checkUseFetcherWhenNoCheckpoint() throws Exception {
-
- FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
- List<KafkaTopicPartition> partitionList = new ArrayList<>(1);
- partitionList.add(new KafkaTopicPartition("test", 0));
- consumer.setSubscribedPartitions(partitionList);
-
- OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
- TestingListState<Serializable> listState = new TestingListState<>();
- when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
-
- StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
- when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
-
- // make the context signal that there is no restored state, then validate that
- when(initializationContext.isRestored()).thenReturn(false);
- consumer.initializeState(initializationContext);
- consumer.run(mock(SourceFunction.SourceContext.class));
- }
-
- @Test
- public void testConfigureOnCheckpointsCommitMode() {
+ public void testConfigureOnCheckpointsCommitMode() throws Exception {
DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
consumer.setIsAutoCommitEnabled(true); // this should be ignored
@@ -235,7 +220,7 @@ public class FlinkKafkaConsumerBaseTest {
}
@Test
- public void testConfigureAutoCommitMode() {
+ public void testConfigureAutoCommitMode() throws Exception {
DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
consumer.setIsAutoCommitEnabled(true);
@@ -251,7 +236,7 @@ public class FlinkKafkaConsumerBaseTest {
}
@Test
- public void testConfigureDisableOffsetCommitWithCheckpointing() {
+ public void testConfigureDisableOffsetCommitWithCheckpointing() throws Exception {
DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
consumer.setIsAutoCommitEnabled(true); // this should be ignored
@@ -269,7 +254,7 @@ public class FlinkKafkaConsumerBaseTest {
}
@Test
- public void testConfigureDisableOffsetCommitWithoutCheckpointing() {
+ public void testConfigureDisableOffsetCommitWithoutCheckpointing() throws Exception {
DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
consumer.setIsAutoCommitEnabled(false);
@@ -321,8 +306,10 @@ public class FlinkKafkaConsumerBaseTest {
OperatorStateStore backend = mock(OperatorStateStore.class);
TestingListState<Serializable> listState = new TestingListState<>();
-
- when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+ // mock old 1.2 state (empty)
+ when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(new TestingListState<Serializable>());
+ // mock 1.3 state
+ when(backend.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
@@ -448,8 +435,10 @@ public class FlinkKafkaConsumerBaseTest {
OperatorStateStore backend = mock(OperatorStateStore.class);
TestingListState<Serializable> listState = new TestingListState<>();
-
- when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+ // mock old 1.2 state (empty)
+ when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(new TestingListState<Serializable>());
+ // mock 1.3 state
+ when(backend.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
@@ -567,7 +556,7 @@ public class FlinkKafkaConsumerBaseTest {
@SuppressWarnings("unchecked")
public DummyFlinkKafkaConsumer() {
- super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
+ super(Arrays.asList("dummy-topic"), null, (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class), 0);
}
@Override
@@ -583,8 +572,11 @@ public class FlinkKafkaConsumerBaseTest {
}
@Override
- protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
- return Collections.emptyList();
+ protected AbstractPartitionDiscoverer createPartitionDiscoverer(
+ KafkaTopicsDescriptor topicsDescriptor,
+ int indexOfThisSubtask,
+ int numParallelSubtasks) {
+ return mock(AbstractPartitionDiscoverer.class);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
deleted file mode 100644
index 0be1d57..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests that the partition assignment is deterministic and stable.
- */
-public class KafkaConsumerPartitionAssignmentTest {
-
- @Test
- public void testPartitionsEqualConsumers() {
- try {
- List<KafkaTopicPartition> inPartitions = Arrays.asList(
- new KafkaTopicPartition("test-topic", 4),
- new KafkaTopicPartition("test-topic", 52),
- new KafkaTopicPartition("test-topic", 17),
- new KafkaTopicPartition("test-topic", 1));
-
- for (int i = 0; i < inPartitions.size(); i++) {
- Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>();
- FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
- subscribedPartitionsToStartOffsets,
- inPartitions,
- i,
- inPartitions.size(),
- StartupMode.GROUP_OFFSETS,
- null);
-
- List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
-
- assertEquals(1, subscribedPartitions.size());
- assertTrue(contains(inPartitions, subscribedPartitions.get(0).getPartition()));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testMultiplePartitionsPerConsumers() {
- try {
- final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-
- final List<KafkaTopicPartition> partitions = new ArrayList<>();
- final Set<KafkaTopicPartition> allPartitions = new HashSet<>();
-
- for (int p : partitionIDs) {
- KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p);
- partitions.add(part);
- allPartitions.add(part);
- }
-
- final int numConsumers = 3;
- final int minPartitionsPerConsumer = partitions.size() / numConsumers;
- final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1;
-
- for (int i = 0; i < numConsumers; i++) {
- Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>();
- FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
- subscribedPartitionsToStartOffsets,
- partitions,
- i,
- numConsumers,
- StartupMode.GROUP_OFFSETS,
- null);
-
- List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
-
- assertTrue(subscribedPartitions.size() >= minPartitionsPerConsumer);
- assertTrue(subscribedPartitions.size() <= maxPartitionsPerConsumer);
-
- for (KafkaTopicPartition p : subscribedPartitions) {
- // check that the element was actually contained
- assertTrue(allPartitions.remove(p));
- }
- }
-
- // all partitions must have been assigned
- assertTrue(allPartitions.isEmpty());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPartitionsFewerThanConsumers() {
- try {
- List<KafkaTopicPartition> inPartitions = Arrays.asList(
- new KafkaTopicPartition("test-topic", 4),
- new KafkaTopicPartition("test-topic", 52),
- new KafkaTopicPartition("test-topic", 17),
- new KafkaTopicPartition("test-topic", 1));
-
- final Set<KafkaTopicPartition> allPartitions = new HashSet<>();
- allPartitions.addAll(inPartitions);
-
- final int numConsumers = 2 * inPartitions.size() + 3;
-
- for (int i = 0; i < numConsumers; i++) {
- Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>();
- FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
- subscribedPartitionsToStartOffsets,
- inPartitions,
- i,
- numConsumers,
- StartupMode.GROUP_OFFSETS,
- null);
-
- List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
-
- assertTrue(subscribedPartitions.size() <= 1);
-
- for (KafkaTopicPartition p : subscribedPartitions) {
- // check that the element was actually contained
- assertTrue(allPartitions.remove(p));
- }
- }
-
- // all partitions must have been assigned
- assertTrue(allPartitions.isEmpty());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testAssignEmptyPartitions() {
- try {
- List<KafkaTopicPartition> ep = new ArrayList<>();
- Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>();
- FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
- subscribedPartitionsToStartOffsets,
- ep,
- 2,
- 4,
- StartupMode.GROUP_OFFSETS,
- null);
- assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
-
- subscribedPartitionsToStartOffsets = new HashMap<>();
- FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
- subscribedPartitionsToStartOffsets,
- ep,
- 0,
- 1,
- StartupMode.GROUP_OFFSETS,
- null);
- assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testGrowingPartitionsRemainsStable() {
- try {
- final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
- List<KafkaTopicPartition> newPartitions = new ArrayList<>();
-
- for (int p : newPartitionIDs) {
- KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p);
- newPartitions.add(part);
- }
-
- List<KafkaTopicPartition> initialPartitions = newPartitions.subList(0, 7);
-
- final Set<KafkaTopicPartition> allNewPartitions = new HashSet<>(newPartitions);
- final Set<KafkaTopicPartition> allInitialPartitions = new HashSet<>(initialPartitions);
-
- final int numConsumers = 3;
- final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers;
- final int maxInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers + 1;
- final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers;
- final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1;
-
- Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets1 = new HashMap<>();
- Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets2 = new HashMap<>();
- Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets3 = new HashMap<>();
-
- FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
- subscribedPartitionsToStartOffsets1,
- initialPartitions,
- 0,
- numConsumers,
- StartupMode.GROUP_OFFSETS,
- null);
-
- FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
- subscribedPartitionsToStartOffsets2,
- initialPartitions,
- 1,
- numConsumers,
- StartupMode.GROUP_OFFSETS,
- null);
-
- FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
- subscribedPartitionsToStartOffsets3,
- initialPartitions,
- 2,
- numConsumers,
- StartupMode.GROUP_OFFSETS,
- null);
-
- List<KafkaTopicPartition> subscribedPartitions1 = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
- List<KafkaTopicPartition> subscribedPartitions2 = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());
- List<KafkaTopicPartition> subscribedPartitions3 = new ArrayList<>(subscribedPartitionsToStartOffsets3.keySet());
-
- assertTrue(subscribedPartitions1.size() >= minInitialPartitionsPerConsumer);
- assertTrue(subscribedPartitions1.size() <= maxInitialPartitionsPerConsumer);
- assertTrue(subscribedPartitions2.size() >= minInitialPartitionsPerConsumer);
- assertTrue(subscribedPartitions2.size() <= maxInitialPartitionsPerConsumer);
- assertTrue(subscribedPartitions3.size() >= minInitialPartitionsPerConsumer);
- assertTrue(subscribedPartitions3.size() <= maxInitialPartitionsPerConsumer);
-
- for (KafkaTopicPartition p : subscribedPartitions1) {
- // check that the element was actually contained
- assertTrue(allInitialPartitions.remove(p));
- }
-
- for (KafkaTopicPartition p : subscribedPartitions2) {
- // check that the element was actually contained
- assertTrue(allInitialPartitions.remove(p));
- }
-
- for (KafkaTopicPartition p : subscribedPartitions3) {
- // check that the element was actually contained
- assertTrue(allInitialPartitions.remove(p));
- }
-
- // all partitions must have been assigned
- assertTrue(allInitialPartitions.isEmpty());
-
- // grow the set of partitions and distribute anew
-
- subscribedPartitionsToStartOffsets1 = new HashMap<>();
- subscribedPartitionsToStartOffsets2 = new HashMap<>();
- subscribedPartitionsToStartOffsets3 = new HashMap<>();
-
- FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
- subscribedPartitionsToStartOffsets1,
- newPartitions,
- 0,
- numConsumers,
- StartupMode.GROUP_OFFSETS,
- null);
-
- FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
- subscribedPartitionsToStartOffsets2,
- newPartitions,
- 1,
- numConsumers,
- StartupMode.GROUP_OFFSETS,
- null);
-
- FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
- subscribedPartitionsToStartOffsets3,
- newPartitions,
- 2,
- numConsumers,
- StartupMode.GROUP_OFFSETS,
- null);
-
- List<KafkaTopicPartition> subscribedPartitions1New = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
- List<KafkaTopicPartition> subscribedPartitions2New = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());
- List<KafkaTopicPartition> subscribedPartitions3New = new ArrayList<>(subscribedPartitionsToStartOffsets3.keySet());
-
- // new partitions must include all old partitions
-
- assertTrue(subscribedPartitions1New.size() > subscribedPartitions1.size());
- assertTrue(subscribedPartitions2New.size() > subscribedPartitions2.size());
- assertTrue(subscribedPartitions3New.size() > subscribedPartitions3.size());
-
- assertTrue(subscribedPartitions1New.containsAll(subscribedPartitions1));
- assertTrue(subscribedPartitions2New.containsAll(subscribedPartitions2));
- assertTrue(subscribedPartitions3New.containsAll(subscribedPartitions3));
-
- assertTrue(subscribedPartitions1New.size() >= minNewPartitionsPerConsumer);
- assertTrue(subscribedPartitions1New.size() <= maxNewPartitionsPerConsumer);
- assertTrue(subscribedPartitions2New.size() >= minNewPartitionsPerConsumer);
- assertTrue(subscribedPartitions2New.size() <= maxNewPartitionsPerConsumer);
- assertTrue(subscribedPartitions3New.size() >= minNewPartitionsPerConsumer);
- assertTrue(subscribedPartitions3New.size() <= maxNewPartitionsPerConsumer);
-
- for (KafkaTopicPartition p : subscribedPartitions1New) {
- // check that the element was actually contained
- assertTrue(allNewPartitions.remove(p));
- }
- for (KafkaTopicPartition p : subscribedPartitions2New) {
- // check that the element was actually contained
- assertTrue(allNewPartitions.remove(p));
- }
- for (KafkaTopicPartition p : subscribedPartitions3New) {
- // check that the element was actually contained
- assertTrue(allNewPartitions.remove(p));
- }
-
- // all partitions must have been assigned
- assertTrue(allNewPartitions.isEmpty());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- private boolean contains(List<KafkaTopicPartition> inPartitions, int partition) {
- for (KafkaTopicPartition ktp : inPartitions) {
- if (ktp.getPartition() == partition) {
- return true;
- }
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 03a23f5..27e9ccd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -187,7 +186,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
RuntimeException re = (RuntimeException) jee.getCause();
- assertTrue(re.getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]"));
+ assertTrue(re.getMessage().contains("Unable to retrieve any partitions"));
}
}
}
@@ -1625,7 +1624,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
});
- JobExecutionResult result = tryExecute(env1, "Consume " + elementCount + " elements from Kafka");
+ tryExecute(env1, "Consume " + elementCount + " elements from Kafka");
deleteTestTopic(topic);
}