You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/04/13 10:31:06 UTC
[08/14] flink git commit: [FLINK-3375] [kafka connector]
Rework/simplify Kafka Connector and have a WatermarkExtractor object per
partition
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
new file mode 100644
index 0000000..99c5d69
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * A special version of the per-kafka-partition-state that additionally holds
+ * a periodic watermark generator (and timestamp extractor) per partition.
+ *
+ * @param <T> The type of records handled by the watermark generator
+ * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
+ */
+public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
+
+ /** The timestamp assigner and watermark generator for the partition */
+ private final AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks;
+
+ /** The last watermark timestamp generated by this partition */
+ private long partitionWatermark;
+
+ // ------------------------------------------------------------------------
+
+ public KafkaTopicPartitionStateWithPeriodicWatermarks(
+ KafkaTopicPartition partition, KPH kafkaPartitionHandle,
+ AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks)
+ {
+ super(partition, kafkaPartitionHandle);
+
+ this.timestampsAndWatermarks = timestampsAndWatermarks;
+ this.partitionWatermark = Long.MIN_VALUE;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public long getTimestampForRecord (T record) {
+ return timestampsAndWatermarks.extractTimestamp(record, Long.MIN_VALUE);
+ }
+
+ public long getCurrentWatermarkTimestamp() {
+ Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
+ if (wm != null) {
+ partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp());
+ }
+ return partitionWatermark;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition()
+ + ", offset=" + getOffset() + ", watermark=" + partitionWatermark;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
new file mode 100644
index 0000000..b265990
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import javax.annotation.Nullable;
+
+/**
+ * A special version of the per-kafka-partition-state that additionally holds
+ * a periodic watermark generator (and timestamp extractor) per partition.
+ *
+ * <p>This class is not thread safe, but it gives volatile access to the current
+ * partition watermark ({@link #getCurrentPartitionWatermark()}).
+ *
+ * @param <T> The type of records handled by the watermark generator
+ * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions
+ */
+public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
+
+ /** The timestamp assigner and watermark generator for the partition */
+ private final AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks;
+
+ /** The last watermark timestamp generated by this partition */
+ private volatile long partitionWatermark;
+
+ // ------------------------------------------------------------------------
+
+ public KafkaTopicPartitionStateWithPunctuatedWatermarks(
+ KafkaTopicPartition partition, KPH kafkaPartitionHandle,
+ AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks)
+ {
+ super(partition, kafkaPartitionHandle);
+
+ this.timestampsAndWatermarks = timestampsAndWatermarks;
+ this.partitionWatermark = Long.MIN_VALUE;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public long getTimestampForRecord(T record) {
+ return timestampsAndWatermarks.extractTimestamp(record, Long.MIN_VALUE);
+ }
+
+ @Nullable
+ public Watermark checkAndGetNewWatermark(T record, long timestamp) {
+ Watermark mark = timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp);
+ if (mark != null && mark.getTimestamp() > partitionWatermark) {
+ partitionWatermark = mark.getTimestamp();
+ return mark;
+ }
+ else {
+ return null;
+ }
+ }
+
+ public long getCurrentPartitionWatermark() {
+ return partitionWatermark;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "KafkaTopicPartitionStateWithPunctuatedWatermarks: partition=" + getKafkaTopicPartition()
+ + ", offset=" + getOffset() + ", watermark=" + partitionWatermark;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
index 038f414..37e2ef6 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kafka.partitioner;
+package org.apache.flink.streaming.connectors.kafka.partitioner;
import java.io.Serializable;
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
index 1be6b00..bda90bd 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
@@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kafka.util;
+package org.apache.flink.streaming.connectors.kafka.util;
import java.util.Properties;
@@ -38,10 +38,9 @@ public class KafkaUtils {
"Entered value='" + config.getProperty(key) + "'. Default value='" + defaultValue + "'");
}
}
-
- public static void checkArgument(boolean arg) {
- if(!arg) {
- throw new IllegalArgumentException();
- }
- }
+
+ // ------------------------------------------------------------------------
+
+ /** Private default constructor to prevent instantiation */
+ private KafkaUtils() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
new file mode 100644
index 0000000..f4ef995
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class FlinkKafkaConsumerBaseTest {
+
+ /**
+ * Tests that not both types of timestamp extractors / watermark generators can be used.
+ */
+ @Test
+ public void testEitherWatermarkExtractor() {
+ try {
+ new DummyFlinkKafkaConsumer<>().setPeriodicWatermarkEmitter(null);
+ fail();
+ } catch (NullPointerException ignored) {}
+
+ try {
+ new DummyFlinkKafkaConsumer<>().setPunctuatedWatermarkEmitter(null);
+ fail();
+ } catch (NullPointerException ignored) {}
+
+ @SuppressWarnings("unchecked")
+ final AssignerWithPeriodicWatermarks<String> periodicAssigner = mock(AssignerWithPeriodicWatermarks.class);
+ @SuppressWarnings("unchecked")
+ final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
+
+ DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>();
+ c1.setPeriodicWatermarkEmitter(periodicAssigner);
+ try {
+ c1.setPunctuatedWatermarkEmitter(punctuatedAssigner);
+ fail();
+ } catch (IllegalStateException ignored) {}
+
+ DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>();
+ c2.setPunctuatedWatermarkEmitter(punctuatedAssigner);
+ try {
+ c2.setPeriodicWatermarkEmitter(periodicAssigner);
+ fail();
+ } catch (IllegalStateException ignored) {}
+ }
+
+ /**
+ * Tests that no checkpoints happen when the fetcher is not running.
+ */
+ @Test
+ public void ignoreCheckpointWhenNotRunning() throws Exception {
+ @SuppressWarnings("unchecked")
+ final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+
+ FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, new LinkedMap(), false);
+ assertNull(consumer.snapshotState(17L, 23L));
+ consumer.notifyCheckpointComplete(66L);
+ }
+
+ /**
+ * Tests that no checkpoints happen when the fetcher is not running.
+ */
+ @Test
+ public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception {
+ HashMap<KafkaTopicPartition, Long> restoreState = new HashMap<>();
+ restoreState.put(new KafkaTopicPartition("abc", 13), 16768L);
+ restoreState.put(new KafkaTopicPartition("def", 7), 987654321L);
+
+ FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
+ consumer.restoreState(restoreState);
+
+ assertEquals(restoreState, consumer.snapshotState(17L, 23L));
+ }
+
+ /**
+ * Tests that no checkpoints happen when the fetcher is not running.
+ */
+ @Test
+ public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception {
+ FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
+ assertNull(consumer.snapshotState(17L, 23L));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSnapshotState() throws Exception {
+ final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>();
+ state1.put(new KafkaTopicPartition("abc", 13), 16768L);
+ state1.put(new KafkaTopicPartition("def", 7), 987654321L);
+
+ final HashMap<KafkaTopicPartition, Long> state2 = new HashMap<>();
+ state2.put(new KafkaTopicPartition("abc", 13), 16770L);
+ state2.put(new KafkaTopicPartition("def", 7), 987654329L);
+
+ final HashMap<KafkaTopicPartition, Long> state3 = new HashMap<>();
+ state2.put(new KafkaTopicPartition("abc", 13), 16780L);
+ state2.put(new KafkaTopicPartition("def", 7), 987654377L);
+
+ final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+ when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
+
+ final LinkedMap pendingCheckpoints = new LinkedMap();
+
+ FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingCheckpoints, true);
+ assertEquals(0, pendingCheckpoints.size());
+
+ // checkpoint 1
+ HashMap<KafkaTopicPartition, Long> snapshot1 = consumer.snapshotState(138L, 19L);
+ assertEquals(state1, snapshot1);
+ assertEquals(1, pendingCheckpoints.size());
+ assertEquals(state1, pendingCheckpoints.get(138L));
+
+ // checkpoint 2
+ HashMap<KafkaTopicPartition, Long> snapshot2 = consumer.snapshotState(140L, 1578L);
+ assertEquals(state2, snapshot2);
+ assertEquals(2, pendingCheckpoints.size());
+ assertEquals(state2, pendingCheckpoints.get(140L));
+
+ // ack checkpoint 1
+ consumer.notifyCheckpointComplete(138L);
+ assertEquals(1, pendingCheckpoints.size());
+ assertTrue(pendingCheckpoints.containsKey(140L));
+
+ // checkpoint 3
+ HashMap<KafkaTopicPartition, Long> snapshot3 = consumer.snapshotState(141L, 1578L);
+ assertEquals(state3, snapshot3);
+ assertEquals(2, pendingCheckpoints.size());
+ assertEquals(state3, pendingCheckpoints.get(141L));
+
+ // ack checkpoint 3, subsumes number 2
+ consumer.notifyCheckpointComplete(141L);
+ assertEquals(0, pendingCheckpoints.size());
+
+
+ consumer.notifyCheckpointComplete(666); // invalid checkpoint
+ assertEquals(0, pendingCheckpoints.size());
+
+ // create 500 snapshots
+ for (int i = 100; i < 600; i++) {
+ consumer.snapshotState(i, 15 * i);
+ }
+ assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
+
+ // commit only the second last
+ consumer.notifyCheckpointComplete(598);
+ assertEquals(1, pendingCheckpoints.size());
+
+ // access invalid checkpoint
+ consumer.notifyCheckpointComplete(590);
+
+ // and the last
+ consumer.notifyCheckpointComplete(599);
+ assertEquals(0, pendingCheckpoints.size());
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static <T> FlinkKafkaConsumerBase<T> getConsumer(
+ AbstractFetcher<T, ?> fetcher, LinkedMap pendingCheckpoints, boolean running) throws Exception
+ {
+ FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>();
+
+ Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
+ fetcherField.setAccessible(true);
+ fetcherField.set(consumer, fetcher);
+
+ Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints");
+ mapField.setAccessible(true);
+ mapField.set(consumer, pendingCheckpoints);
+
+ Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running");
+ runningField.setAccessible(true);
+ runningField.set(consumer, running);
+
+ return consumer;
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
+ private static final long serialVersionUID = 1L;
+
+ @SuppressWarnings("unchecked")
+ public DummyFlinkKafkaConsumer() {
+ super((KeyedDeserializationSchema<T>) mock(KeyedDeserializationSchema.class));
+ }
+
+ @Override
+ protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index e86d51a..9beed22 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -18,10 +18,8 @@
package org.apache.flink.streaming.connectors.kafka;
-
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
-import org.apache.kafka.common.Node;
+
import org.junit.Test;
import java.util.ArrayList;
@@ -32,30 +30,27 @@ import java.util.Set;
import static org.junit.Assert.*;
-
/**
* Tests that the partition assignment is deterministic and stable.
*/
public class KafkaConsumerPartitionAssignmentTest {
- private final Node fake = new Node(1337, "localhost", 1337);
-
@Test
public void testPartitionsEqualConsumers() {
try {
- List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>();
- inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake));
- inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake));
- inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake));
- inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake));
+ List<KafkaTopicPartition> inPartitions = Arrays.asList(
+ new KafkaTopicPartition("test-topic", 4),
+ new KafkaTopicPartition("test-topic", 52),
+ new KafkaTopicPartition("test-topic", 17),
+ new KafkaTopicPartition("test-topic", 1));
for (int i = 0; i < inPartitions.size(); i++) {
- List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions(
- inPartitions, inPartitions.size(), i);
+ List<KafkaTopicPartition> parts =
+ FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i);
assertNotNull(parts);
assertEquals(1, parts.size());
- assertTrue(contains(inPartitions, parts.get(0).getTopicPartition().getPartition()));
+ assertTrue(contains(inPartitions, parts.get(0).getPartition()));
}
}
catch (Exception e) {
@@ -64,9 +59,9 @@ public class KafkaConsumerPartitionAssignmentTest {
}
}
- private boolean contains(List<KafkaTopicPartitionLeader> inPartitions, int partition) {
- for (KafkaTopicPartitionLeader ktp: inPartitions) {
- if (ktp.getTopicPartition().getPartition() == partition) {
+ private boolean contains(List<KafkaTopicPartition> inPartitions, int partition) {
+ for (KafkaTopicPartition ktp : inPartitions) {
+ if (ktp.getPartition() == partition) {
return true;
}
}
@@ -78,11 +73,11 @@ public class KafkaConsumerPartitionAssignmentTest {
try {
final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
- final List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
- final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>();
+ final List<KafkaTopicPartition> partitions = new ArrayList<>();
+ final Set<KafkaTopicPartition> allPartitions = new HashSet<>();
for (int p : partitionIDs) {
- KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake);
+ KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p);
partitions.add(part);
allPartitions.add(part);
}
@@ -92,13 +87,14 @@ public class KafkaConsumerPartitionAssignmentTest {
final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1;
for (int i = 0; i < numConsumers; i++) {
- List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
+ List<KafkaTopicPartition> parts =
+ FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
assertNotNull(parts);
assertTrue(parts.size() >= minPartitionsPerConsumer);
assertTrue(parts.size() <= maxPartitionsPerConsumer);
- for (KafkaTopicPartitionLeader p : parts) {
+ for (KafkaTopicPartition p : parts) {
// check that the element was actually contained
assertTrue(allPartitions.remove(p));
}
@@ -116,24 +112,24 @@ public class KafkaConsumerPartitionAssignmentTest {
@Test
public void testPartitionsFewerThanConsumers() {
try {
- List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>();
- inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake));
- inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake));
- inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake));
- inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake));
+ List<KafkaTopicPartition> inPartitions = Arrays.asList(
+ new KafkaTopicPartition("test-topic", 4),
+ new KafkaTopicPartition("test-topic", 52),
+ new KafkaTopicPartition("test-topic", 17),
+ new KafkaTopicPartition("test-topic", 1));
- final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>();
+ final Set<KafkaTopicPartition> allPartitions = new HashSet<>();
allPartitions.addAll(inPartitions);
final int numConsumers = 2 * inPartitions.size() + 3;
for (int i = 0; i < numConsumers; i++) {
- List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
+ List<KafkaTopicPartition> parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
assertNotNull(parts);
assertTrue(parts.size() <= 1);
- for (KafkaTopicPartitionLeader p : parts) {
+ for (KafkaTopicPartition p : parts) {
// check that the element was actually contained
assertTrue(allPartitions.remove(p));
}
@@ -151,12 +147,12 @@ public class KafkaConsumerPartitionAssignmentTest {
@Test
public void testAssignEmptyPartitions() {
try {
- List<KafkaTopicPartitionLeader> ep = new ArrayList<>();
- List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
+ List<KafkaTopicPartition> ep = new ArrayList<>();
+ List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
assertNotNull(parts1);
assertTrue(parts1.isEmpty());
- List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
+ List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
assertNotNull(parts2);
assertTrue(parts2.isEmpty());
}
@@ -170,17 +166,17 @@ public class KafkaConsumerPartitionAssignmentTest {
public void testGrowingPartitionsRemainsStable() {
try {
final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
- List<KafkaTopicPartitionLeader> newPartitions = new ArrayList<>();
+ List<KafkaTopicPartition> newPartitions = new ArrayList<>();
for (int p : newPartitionIDs) {
- KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake);
+ KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p);
newPartitions.add(part);
}
- List<KafkaTopicPartitionLeader> initialPartitions = newPartitions.subList(0, 7);
+ List<KafkaTopicPartition> initialPartitions = newPartitions.subList(0, 7);
- final Set<KafkaTopicPartitionLeader> allNewPartitions = new HashSet<>(newPartitions);
- final Set<KafkaTopicPartitionLeader> allInitialPartitions = new HashSet<>(initialPartitions);
+ final Set<KafkaTopicPartition> allNewPartitions = new HashSet<>(newPartitions);
+ final Set<KafkaTopicPartition> allInitialPartitions = new HashSet<>(initialPartitions);
final int numConsumers = 3;
final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers;
@@ -188,11 +184,11 @@ public class KafkaConsumerPartitionAssignmentTest {
final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers;
final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1;
- List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumerBase.assignPartitions(
+ List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(
initialPartitions, numConsumers, 0);
- List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumerBase.assignPartitions(
+ List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(
initialPartitions, numConsumers, 1);
- List<KafkaTopicPartitionLeader> parts3 = FlinkKafkaConsumerBase.assignPartitions(
+ List<KafkaTopicPartition> parts3 = FlinkKafkaConsumerBase.assignPartitions(
initialPartitions, numConsumers, 2);
assertNotNull(parts1);
@@ -206,15 +202,15 @@ public class KafkaConsumerPartitionAssignmentTest {
assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
- for (KafkaTopicPartitionLeader p : parts1) {
+ for (KafkaTopicPartition p : parts1) {
// check that the element was actually contained
assertTrue(allInitialPartitions.remove(p));
}
- for (KafkaTopicPartitionLeader p : parts2) {
+ for (KafkaTopicPartition p : parts2) {
// check that the element was actually contained
assertTrue(allInitialPartitions.remove(p));
}
- for (KafkaTopicPartitionLeader p : parts3) {
+ for (KafkaTopicPartition p : parts3) {
// check that the element was actually contained
assertTrue(allInitialPartitions.remove(p));
}
@@ -224,11 +220,11 @@ public class KafkaConsumerPartitionAssignmentTest {
// grow the set of partitions and distribute anew
- List<KafkaTopicPartitionLeader> parts1new = FlinkKafkaConsumerBase.assignPartitions(
+ List<KafkaTopicPartition> parts1new = FlinkKafkaConsumerBase.assignPartitions(
newPartitions, numConsumers, 0);
- List<KafkaTopicPartitionLeader> parts2new = FlinkKafkaConsumerBase.assignPartitions(
+ List<KafkaTopicPartition> parts2new = FlinkKafkaConsumerBase.assignPartitions(
newPartitions, numConsumers, 1);
- List<KafkaTopicPartitionLeader> parts3new = FlinkKafkaConsumerBase.assignPartitions(
+ List<KafkaTopicPartition> parts3new = FlinkKafkaConsumerBase.assignPartitions(
newPartitions, numConsumers, 2);
// new partitions must include all old partitions
@@ -248,15 +244,15 @@ public class KafkaConsumerPartitionAssignmentTest {
assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
- for (KafkaTopicPartitionLeader p : parts1new) {
+ for (KafkaTopicPartition p : parts1new) {
// check that the element was actually contained
assertTrue(allNewPartitions.remove(p));
}
- for (KafkaTopicPartitionLeader p : parts2new) {
+ for (KafkaTopicPartition p : parts2new) {
// check that the element was actually contained
assertTrue(allNewPartitions.remove(p));
}
- for (KafkaTopicPartitionLeader p : parts3new) {
+ for (KafkaTopicPartition p : parts3new) {
// check that the element was actually contained
assertTrue(allNewPartitions.remove(p));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 340950b..aa5344b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -26,8 +26,6 @@ import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.server.KafkaServer;
-import org.apache.commons.collections.map.LinkedMap;
-
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
@@ -66,12 +64,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
@@ -98,7 +94,6 @@ import org.junit.Rule;
import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@@ -176,70 +171,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}
}
- /**
- * Test that validates that checkpointing and checkpoint notification works properly
- */
- public void runCheckpointingTest() throws Exception {
- createTestTopic("testCheckpointing", 1, 1);
-
- FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("testCheckpointing", new SimpleStringSchema(), standardProps);
- Field pendingCheckpointsField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints");
- pendingCheckpointsField.setAccessible(true);
- LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
-
- Assert.assertEquals(0, pendingCheckpoints.size());
- source.setRuntimeContext(new MockRuntimeContext(1, 0));
-
- final HashMap<KafkaTopicPartition, Long> initialOffsets = new HashMap<>();
- initialOffsets.put(new KafkaTopicPartition("testCheckpointing", 0), 1337L);
-
- // first restore
- source.restoreState(initialOffsets);
-
- // then open
- source.open(new Configuration());
- HashMap<KafkaTopicPartition, Long> state1 = source.snapshotState(1, 15);
-
- assertEquals(initialOffsets, state1);
-
- HashMap<KafkaTopicPartition, Long> state2 = source.snapshotState(2, 30);
- Assert.assertEquals(initialOffsets, state2);
-
- Assert.assertEquals(2, pendingCheckpoints.size());
-
- source.notifyCheckpointComplete(1);
- Assert.assertEquals(1, pendingCheckpoints.size());
-
- source.notifyCheckpointComplete(2);
- Assert.assertEquals(0, pendingCheckpoints.size());
-
- source.notifyCheckpointComplete(666); // invalid checkpoint
- Assert.assertEquals(0, pendingCheckpoints.size());
-
- // create 500 snapshots
- for (int i = 100; i < 600; i++) {
- source.snapshotState(i, 15 * i);
- }
- Assert.assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
-
- // commit only the second last
- source.notifyCheckpointComplete(598);
- Assert.assertEquals(1, pendingCheckpoints.size());
-
- // access invalid checkpoint
- source.notifyCheckpointComplete(590);
-
- // and the last
- source.notifyCheckpointComplete(599);
- Assert.assertEquals(0, pendingCheckpoints.size());
-
- source.close();
-
- deleteTestTopic("testCheckpointing");
- }
-
-
-
+
/**
* Ensure Kafka is working on both producer and consumer side.
* This executes a job that contains two Flink pipelines.
@@ -409,7 +341,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env.enableCheckpointing(500);
env.setParallelism(parallelism);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
env.getConfig().disableSysoutLogging();
FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
@@ -454,7 +386,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env.enableCheckpointing(500);
env.setParallelism(parallelism);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
env.getConfig().disableSysoutLogging();
FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
@@ -499,7 +431,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env.enableCheckpointing(500);
env.setParallelism(parallelism);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
env.getConfig().disableSysoutLogging();
env.setBufferTimeout(0);
@@ -562,7 +494,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
runnerThread.start();
// wait a bit before canceling
- Thread.sleep(8000);
+ Thread.sleep(2000);
Throwable failueCause = jobError.get();
if(failueCause != null) {
@@ -634,10 +566,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
runnerThread.start();
// wait a bit before canceling
- Thread.sleep(8000);
+ Thread.sleep(2000);
Throwable failueCause = error.get();
- if(failueCause != null) {
+ if (failueCause != null) {
failueCause.printStackTrace();
Assert.fail("Test failed prematurely with: " + failueCause.getMessage());
}
@@ -709,7 +641,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
final int NUM_ELEMENTS = 20;
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-
+ env.getConfig().disableSysoutLogging();
+
// create topics with content
final List<String> topics = new ArrayList<>();
for (int i = 0; i < NUM_TOPICS; i++) {
@@ -745,6 +678,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// run second job consuming from multiple topics
env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.getConfig().disableSysoutLogging();
+
stream = env.addSource(kafkaServer.getConsumer(topics, schema, standardProps));
stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() {
@@ -1453,50 +1388,50 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
///////////// Testing the Kafka consumer with embeded watermark generation functionality ///////////////
- @RetryOnException(times=0, exception=kafka.common.NotLeaderForPartitionException.class)
- public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean emptyPartition) throws Exception {
-
- final String topic1 = "wmExtractorTopic1_" + UUID.randomUUID().toString();
- final String topic2 = "wmExtractorTopic2_" + UUID.randomUUID().toString();
-
- final Map<String, Boolean> topics = new HashMap<>();
- topics.put(topic1, false);
- topics.put(topic2, emptyPartition);
-
- final int noOfTopcis = topics.size();
- final int partitionsPerTopic = 1;
- final int elementsPerPartition = 100 + 1;
-
- final int totalElements = emptyPartition ?
- partitionsPerTopic * elementsPerPartition :
- noOfTopcis * partitionsPerTopic * elementsPerPartition;
-
- createTestTopic(topic1, partitionsPerTopic, 1);
- createTestTopic(topic2, partitionsPerTopic, 1);
-
- final StreamExecutionEnvironment env =
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setParallelism(partitionsPerTopic);
- env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
- env.getConfig().disableSysoutLogging();
-
- TypeInformation<Tuple2<Long, Integer>> longIntType = TypeInfoParser.parse("Tuple2<Long, Integer>");
-
- Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
- producerProperties.setProperty("retries", "0");
-
- putDataInTopics(env, producerProperties, elementsPerPartition, topics, longIntType);
-
- List<String> topicTitles = new ArrayList<>(topics.keySet());
- runPunctuatedComsumer(env, topicTitles, totalElements, longIntType);
-
- executeAndCatchException(env, "runComsumerWithPunctuatedExplicitWMTest");
-
- for(String topic: topicTitles) {
- deleteTestTopic(topic);
- }
- }
+// @RetryOnException(times=0, exception=kafka.common.NotLeaderForPartitionException.class)
+// public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean emptyPartition) throws Exception {
+//
+// final String topic1 = "wmExtractorTopic1_" + UUID.randomUUID().toString();
+// final String topic2 = "wmExtractorTopic2_" + UUID.randomUUID().toString();
+//
+// final Map<String, Boolean> topics = new HashMap<>();
+// topics.put(topic1, false);
+// topics.put(topic2, emptyPartition);
+//
+// final int noOfTopcis = topics.size();
+// final int partitionsPerTopic = 1;
+// final int elementsPerPartition = 100 + 1;
+//
+// final int totalElements = emptyPartition ?
+// partitionsPerTopic * elementsPerPartition :
+// noOfTopcis * partitionsPerTopic * elementsPerPartition;
+//
+// createTestTopic(topic1, partitionsPerTopic, 1);
+// createTestTopic(topic2, partitionsPerTopic, 1);
+//
+// final StreamExecutionEnvironment env =
+// StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+// env.setParallelism(partitionsPerTopic);
+// env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
+// env.getConfig().disableSysoutLogging();
+//
+// TypeInformation<Tuple2<Long, Integer>> longIntType = TypeInfoParser.parse("Tuple2<Long, Integer>");
+//
+// Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+// producerProperties.setProperty("retries", "0");
+//
+// putDataInTopics(env, producerProperties, elementsPerPartition, topics, longIntType);
+//
+// List<String> topicTitles = new ArrayList<>(topics.keySet());
+// runPunctuatedComsumer(env, topicTitles, totalElements, longIntType);
+//
+// executeAndCatchException(env, "runComsumerWithPunctuatedExplicitWMTest");
+//
+// for(String topic: topicTitles) {
+// deleteTestTopic(topic);
+// }
+// }
private void executeAndCatchException(StreamExecutionEnvironment env, String execName) throws Exception {
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index e251174..14e74f1 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -167,7 +167,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
// ------------------------------------------------------------------------
- public static class CustomPartitioner extends KafkaPartitioner implements Serializable {
+ public static class CustomPartitioner extends KafkaPartitioner<Tuple2<Long, String>> implements Serializable {
private final int expectedPartitions;
@@ -177,12 +177,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
@Override
- public int partition(Object next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- Tuple2<Long, String> tuple = (Tuple2<Long, String>) next;
-
+ public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
assertEquals(expectedPartitions, numPartitions);
- return (int) (tuple.f0 % numPartitions);
+ return (int) (next.f0 % numPartitions);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 9f8159c..9e3c33b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -26,16 +26,16 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.InstantiationUtil;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,14 +44,18 @@ import java.io.Serializable;
import java.util.Properties;
import static org.apache.flink.test.util.TestUtils.tryExecute;
-
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* A class containing a special Kafka broker which has a log retention of only 250 ms.
* This way, we can make sure our consumer is properly handling cases where we run into out of offset
* errors
*/
+@SuppressWarnings("serial")
public class KafkaShortRetentionTestBase implements Serializable {
+
protected static final Logger LOG = LoggerFactory.getLogger(KafkaShortRetentionTestBase.class);
+
private static KafkaTestEnvironment kafkaServer;
private static Properties standardProps;
private static ForkableFlinkMiniCluster flink;
@@ -108,7 +112,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
final String topic = "auto-offset-reset-test";
final int parallelism = 1;
- final int elementsPerPartition = 50000; // with a sleep time of 1 ms per element, test should run for 50 s
+ final int elementsPerPartition = 50000;
Properties tprops = new Properties();
tprops.setProperty("retention.ms", "250");
@@ -162,6 +166,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
kafkaServer.deleteTestTopic(topic);
}
+
private class NonContinousOffsetsDeserializationSchema implements KeyedDeserializationSchema<String> {
private int numJumps;
long nextExpected = 0;
@@ -205,12 +210,8 @@ public class KafkaShortRetentionTestBase implements Serializable {
*/
public void runFailOnAutoOffsetResetNone() throws Exception {
final String topic = "auto-offset-reset-none-test";
-
final int parallelism = 1;
- final int elementsPerPartition = 50000; // with a sleep time of 1 ms per element, test should run for 50 s
- final int totalElements = parallelism * elementsPerPartition;
-
-
+
kafkaServer.createTestTopic(topic, parallelism, 1);
final StreamExecutionEnvironment env =
@@ -218,8 +219,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
env.setParallelism(parallelism);
env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
env.getConfig().disableSysoutLogging();
-
-
+
// ----------- add consumer ----------
Properties customProps = new Properties();
@@ -245,4 +245,27 @@ public class KafkaShortRetentionTestBase implements Serializable {
kafkaServer.deleteTestTopic(topic);
}
+ public void runFailOnAutoOffsetResetNoneEager() throws Exception {
+ final String topic = "auto-offset-reset-none-test";
+ final int parallelism = 1;
+
+ kafkaServer.createTestTopic(topic, parallelism, 1);
+
+ // ----------- add consumer ----------
+
+ Properties customProps = new Properties();
+ customProps.putAll(standardProps);
+ customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
+
+ try {
+ kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
+ fail("should fail with an exception");
+ }
+ catch (IllegalArgumentException e) {
+ // expected
+ assertTrue(e.getMessage().contains("none"));
+ }
+
+ kafkaServer.deleteTestTopic(topic);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
new file mode 100644
index 0000000..0e16263
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import static org.junit.Assert.*;
+
+public class KafkaTopicPartitionTest {
+
+ @Test
+ public void validateUid() {
+ Field uidField;
+ try {
+ uidField = KafkaTopicPartition.class.getDeclaredField("serialVersionUID");
+ uidField.setAccessible(true);
+ }
+ catch (NoSuchFieldException e) {
+ fail("serialVersionUID is not defined");
+ return;
+ }
+
+ assertTrue(Modifier.isStatic(uidField.getModifiers()));
+ assertTrue(Modifier.isFinal(uidField.getModifiers()));
+ assertTrue(Modifier.isPrivate(uidField.getModifiers()));
+
+ assertEquals(long.class, uidField.getType());
+
+ // the UID has to be constant to make sure old checkpoints/savepoints can be read
+ try {
+ assertEquals(722083576322742325L, uidField.getLong(null));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
index e94adb5..24822ed 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
@@ -36,30 +36,39 @@ public class JobManagerCommunicationUtils {
public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
+ JobStatusMessage status = null;
- // find the jobID
- Future<Object> listResponse = jobManager.ask(
- JobManagerMessages.getRequestRunningJobsStatus(),
- askTimeout);
-
- List<JobStatusMessage> jobs;
- try {
- Object result = Await.result(listResponse, askTimeout);
- jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
- }
- catch (Exception e) {
- throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
- }
+ for (int i = 0; i < 200; i++) {
+ // find the jobID
+ Future<Object> listResponse = jobManager.ask(
+ JobManagerMessages.getRequestRunningJobsStatus(),
+ askTimeout);
+
+ List<JobStatusMessage> jobs;
+ try {
+ Object result = Await.result(listResponse, askTimeout);
+ jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+ }
+ catch (Exception e) {
+ throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
+ }
- if (jobs.isEmpty()) {
- throw new Exception("Could not cancel job - no running jobs");
- }
- if (jobs.size() != 1) {
- throw new Exception("Could not cancel job - more than one running job.");
+ if (jobs.isEmpty()) {
+ // try again, fall through the loop
+ Thread.sleep(50);
+ }
+ else if (jobs.size() == 1) {
+ status = jobs.get(0);
+ }
+ else {
+ throw new Exception("Could not cancel job - more than one running job.");
+ }
}
- JobStatusMessage status = jobs.get(0);
- if (status.getJobState().isTerminalState()) {
+ if (status == null) {
+ throw new Exception("Could not cancel job - no running jobs");
+ }
+ else if (status.getJobState().isTerminalState()) {
throw new Exception("Could not cancel job - job is not running any more");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 17e2e6f..e74eee4 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
@@ -44,6 +43,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+@SuppressWarnings("deprecation")
public class MockRuntimeContext extends StreamingRuntimeContext {
private final int numberOfParallelSubtasks;
@@ -57,15 +57,6 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
this.indexOfThisSubtask = indexOfThisSubtask;
}
- private static class MockStreamOperator extends AbstractStreamOperator<Integer> {
- private static final long serialVersionUID = -1153976702711944427L;
-
- @Override
- public ExecutionConfig getExecutionConfig() {
- return new ExecutionConfig();
- }
- }
-
@Override
public boolean isCheckpointingEnabled() {
return true;
@@ -152,12 +143,12 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
}
@Override
- public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
+ public <S> org.apache.flink.api.common.state.OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
throw new UnsupportedOperationException();
}
@Override
- public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
+ public <S> org.apache.flink.api.common.state.OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
throw new UnsupportedOperationException();
}
@@ -175,4 +166,15 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException();
}
+
+ // ------------------------------------------------------------------------
+
+ private static class MockStreamOperator extends AbstractStreamOperator<Integer> {
+ private static final long serialVersionUID = -1153976702711944427L;
+
+ @Override
+ public ExecutionConfig getExecutionConfig() {
+ return new ExecutionConfig();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
index 4b17300..4388c9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.functions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.watermark.Watermark;
+import javax.annotation.Nullable;
+
/**
* The {@code AssignerWithPeriodicWatermarks} assigns event time timestamps to elements,
* and generates low watermarks that signal event time progress within the stream.
@@ -71,5 +73,6 @@ public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T>
*
* @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
*/
+ @Nullable
Watermark getCurrentWatermark();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
index 48f29b2..5b5694c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.functions;
import org.apache.flink.streaming.api.watermark.Watermark;
+import javax.annotation.Nullable;
+
/**
* The {@code AssignerWithPunctuatedWatermarks} assigns event time timestamps to elements,
* and generates low watermarks that signal event time progress within the stream.
@@ -79,5 +81,6 @@ public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T
*
* @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
*/
+ @Nullable
Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}