You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/04/13 20:51:24 UTC
[1/5] flink git commit: [hotfix] [kafka consumer] Increase Kafka test
stability
Repository: flink
Updated Branches:
refs/heads/master b0a7a1b81 -> 1a34f2165
[hotfix] [kafka consumer] Increase Kafka test stability
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2728f924
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2728f924
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2728f924
Branch: refs/heads/master
Commit: 2728f924cf64cd62929b8f0e394a1d4335af8156
Parents: b0a7a1b
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Apr 13 10:38:37 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 20:50:48 2016 +0200
----------------------------------------------------------------------
.../connectors/kafka/KafkaConsumerTestBase.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2728f924/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 8ff67b4..a65a411 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1278,13 +1278,24 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
new Tuple2Partitioner(parallelism)))
.setParallelism(parallelism);
- writeEnv.execute("Write sequence");
+ try {
+ writeEnv.execute("Write sequence");
+ }
+ catch (Exception e) {
+ LOG.error("Write attempt failed, trying again", e);
+ deleteTestTopic(topicName);
+ JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+ continue;
+ }
+
LOG.info("Finished writing sequence");
// -------- Validate the Sequence --------
// we need to validate the sequence, because kafka's producers are not exactly once
LOG.info("Validating sequence");
+
+ JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
[4/5] flink git commit: [FLINK-3743] upgrading breeze from 0.11.2 to
0.12
Posted by se...@apache.org.
[FLINK-3743] upgrading breeze from 0.11.2 to 0.12
This closes #1876
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a34f216
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a34f216
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a34f216
Branch: refs/heads/master
Commit: 1a34f2165b217a4579df410dd7ccb879d6a58083
Parents: 2dcd27f
Author: Todd Lisonbee <to...@intel.com>
Authored: Mon Apr 11 23:20:43 2016 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 20:50:49 2016 +0200
----------------------------------------------------------------------
flink-libraries/flink-ml/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1a34f216/flink-libraries/flink-ml/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/pom.xml b/flink-libraries/flink-ml/pom.xml
index 986cb0f..0a3531e 100644
--- a/flink-libraries/flink-ml/pom.xml
+++ b/flink-libraries/flink-ml/pom.xml
@@ -44,7 +44,7 @@
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze_${scala.binary.version}</artifactId>
- <version>0.11.2</version>
+ <version>0.12</version>
</dependency>
<dependency>
[2/5] flink git commit: [FLINK-3747] Consolidate TimestampAssigner
Methods in Kafka Consumer
Posted by se...@apache.org.
[FLINK-3747] Consolidate TimestampAssigner Methods in Kafka Consumer
This closes #1877
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e40e29da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e40e29da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e40e29da
Branch: refs/heads/master
Commit: e40e29da9b68ec6da59f7b5372cb1483283c0530
Parents: 8570b6d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Apr 13 11:41:39 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 20:50:49 2016 +0200
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaConsumerBase.java | 4 ++--
.../connectors/kafka/FlinkKafkaConsumerBaseTest.java | 12 ++++++------
.../connectors/kafka/KafkaConsumerTestBase.java | 2 +-
3 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 0ca8fd5..ed5c72f 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -148,7 +148,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
* @param assigner The timestamp assigner / watermark generator to use.
* @return The consumer object, to allow function chaining.
*/
- public FlinkKafkaConsumerBase<T> setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks<T> assigner) {
+ public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
checkNotNull(assigner);
if (this.periodicWatermarkAssigner != null) {
@@ -182,7 +182,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
* @param assigner The timestamp assigner / watermark generator to use.
* @return The consumer object, to allow function chaining.
*/
- public FlinkKafkaConsumerBase<T> setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks<T> assigner) {
+ public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
checkNotNull(assigner);
if (this.punctuatedWatermarkAssigner != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index f4ef995..9b517df 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -44,12 +44,12 @@ public class FlinkKafkaConsumerBaseTest {
@Test
public void testEitherWatermarkExtractor() {
try {
- new DummyFlinkKafkaConsumer<>().setPeriodicWatermarkEmitter(null);
+ new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null);
fail();
} catch (NullPointerException ignored) {}
try {
- new DummyFlinkKafkaConsumer<>().setPunctuatedWatermarkEmitter(null);
+ new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null);
fail();
} catch (NullPointerException ignored) {}
@@ -59,16 +59,16 @@ public class FlinkKafkaConsumerBaseTest {
final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>();
- c1.setPeriodicWatermarkEmitter(periodicAssigner);
+ c1.assignTimestampsAndWatermarks(periodicAssigner);
try {
- c1.setPunctuatedWatermarkEmitter(punctuatedAssigner);
+ c1.assignTimestampsAndWatermarks(punctuatedAssigner);
fail();
} catch (IllegalStateException ignored) {}
DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>();
- c2.setPunctuatedWatermarkEmitter(punctuatedAssigner);
+ c2.assignTimestampsAndWatermarks(punctuatedAssigner);
try {
- c2.setPeriodicWatermarkEmitter(periodicAssigner);
+ c2.assignTimestampsAndWatermarks(periodicAssigner);
fail();
} catch (IllegalStateException ignored) {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index a65a411..cc9205c 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1662,7 +1662,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = kafkaServer
.getConsumer(topics, sourceSchema, standardProps)
- .setPunctuatedWatermarkEmitter(new TestPunctuatedTSExtractor());
+ .assignTimestampsAndWatermarks(new TestPunctuatedTSExtractor());
DataStreamSource<Tuple2<Long, Integer>> consuming = env.setParallelism(1).addSource(source);
[3/5] flink git commit: [FLINK-3375] [Kafka Connector] Add tests for
per-kafka-partition watermarks.
Posted by se...@apache.org.
[FLINK-3375] [Kafka Connector] Add tests for per-kafka-partition watermarks.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2dcd27f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2dcd27f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2dcd27f4
Branch: refs/heads/master
Commit: 2dcd27f403c2a7f10791bfe21c45e2a326aa46a1
Parents: e40e29d
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Apr 13 15:45:51 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 20:50:49 2016 +0200
----------------------------------------------------------------------
.../kafka/internals/AbstractFetcher.java | 77 +++--
.../kafka/internals/ExceptionProxy.java | 60 +++-
.../connectors/kafka/util/KafkaUtils.java | 33 +-
.../AbstractFetcherTimestampsTest.java | 306 +++++++++++++++++++
.../kafka/testutils/MockRuntimeContext.java | 46 ++-
5 files changed, 478 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 594aa66..8183575 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -328,45 +328,66 @@ public abstract class AbstractFetcher<T, KPH> {
ClassLoader userCodeClassLoader)
throws IOException, ClassNotFoundException
{
- @SuppressWarnings("unchecked")
- KafkaTopicPartitionState<KPH>[] partitions =
- (KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()];
-
- int pos = 0;
- for (KafkaTopicPartition partition : assignedPartitions) {
- // create the kafka version specific partition handle
- KPH kafkaHandle = createKafkaPartitionHandle(partition);
+ switch (timestampWatermarkMode) {
- // create the partition state
- KafkaTopicPartitionState<KPH> partitionState;
- switch (timestampWatermarkMode) {
- case NO_TIMESTAMPS_WATERMARKS:
- partitionState = new KafkaTopicPartitionState<>(partition, kafkaHandle);
- break;
- case PERIODIC_WATERMARKS: {
+ case NO_TIMESTAMPS_WATERMARKS: {
+ @SuppressWarnings("unchecked")
+ KafkaTopicPartitionState<KPH>[] partitions =
+ (KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()];
+
+ int pos = 0;
+ for (KafkaTopicPartition partition : assignedPartitions) {
+ // create the kafka version specific partition handle
+ KPH kafkaHandle = createKafkaPartitionHandle(partition);
+ partitions[pos++] = new KafkaTopicPartitionState<>(partition, kafkaHandle);
+ }
+
+ return partitions;
+ }
+
+ case PERIODIC_WATERMARKS: {
+ @SuppressWarnings("unchecked")
+ KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
+ (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
+ new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];
+
+ int pos = 0;
+ for (KafkaTopicPartition partition : assignedPartitions) {
+ KPH kafkaHandle = createKafkaPartitionHandle(partition);
+
AssignerWithPeriodicWatermarks<T> assignerInstance =
watermarksPeriodic.deserializeValue(userCodeClassLoader);
- partitionState = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+
+ partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
partition, kafkaHandle, assignerInstance);
- break;
}
-
- case PUNCTUATED_WATERMARKS: {
+
+ return partitions;
+ }
+
+ case PUNCTUATED_WATERMARKS: {
+ @SuppressWarnings("unchecked")
+ KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
+ (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
+ new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitions.size()];
+
+ int pos = 0;
+ for (KafkaTopicPartition partition : assignedPartitions) {
+ KPH kafkaHandle = createKafkaPartitionHandle(partition);
+
AssignerWithPunctuatedWatermarks<T> assignerInstance =
watermarksPunctuated.deserializeValue(userCodeClassLoader);
- partitionState = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+
+ partitions[pos++] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
partition, kafkaHandle, assignerInstance);
- break;
}
- default:
- // cannot happen, add this as a guard for the future
- throw new RuntimeException();
- }
- partitions[pos++] = partitionState;
+ return partitions;
+ }
+ default:
+ // cannot happen, add this as a guard for the future
+ throw new RuntimeException();
}
-
- return partitions;
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
index 9a0e4e3..c736493 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
@@ -22,7 +22,48 @@ import javax.annotation.Nullable;
import java.util.concurrent.atomic.AtomicReference;
/**
+ * A proxy that communicates exceptions between threads. Typically used if an exception
+ * from a spawned thread needs to be recognized by the "parent" (spawner) thread.
*
+ * <p>The spawned thread would set the exception via {@link #reportError(Throwable)}.
+ * The parent would check (at certain points) for exceptions via {@link #checkAndThrowException()}.
+ * Optionally, the parent can pass itself in the constructor to be interrupted as soon as
+ * an exception occurs.
+ *
+ * <pre>
+ * {@code
+ *
+ * final ExceptionProxy errorProxy = new ExceptionProxy(Thread.currentThread());
+ *
+ * Thread subThread = new Thread() {
+ *
+ * public void run() {
+ * try {
+ * doSomething();
+ * } catch (Throwable t) {
+ * errorProxy.reportError(
+ * } finally {
+ * doSomeCleanup();
+ * }
+ * }
+ * };
+ * subThread.start();
+ *
+ * doSomethingElse();
+ * errorProxy.checkAndThrowException();
+ *
+ * doSomethingMore();
+ * errorProxy.checkAndThrowException();
+ *
+ * try {
+ * subThread.join();
+ * } catch (InterruptedException e) {
+ * errorProxy.checkAndThrowException();
+ * // restore interrupted status, if not caused by an exception
+ * Thread.currentThread().interrupt();
+ * }
+ * }
+ * </pre>
*/
public class ExceptionProxy {
@@ -33,6 +74,8 @@ public class ExceptionProxy {
private final AtomicReference<Throwable> exception;
/**
+ * Creates an exception proxy that interrupts the given thread upon
+ * report of an exception. The thread to interrupt may be null.
*
* @param toInterrupt The thread to interrupt upon an exception. May be null.
*/
@@ -44,18 +87,27 @@ public class ExceptionProxy {
// ------------------------------------------------------------------------
/**
- * Sets the exception occurred and interrupts the target thread,
+ * Sets the exception and interrupts the target thread,
* if no other exception has occurred so far.
*
+ * <p>The exception is only set (and the interruption is only triggered),
+ * if no other exception was set before.
+ *
* @param t The exception that occurred
*/
public void reportError(Throwable t) {
- // set the exception, if it is the first
- if (exception.compareAndSet(null, t) && toInterrupt != null) {
+ // set the exception, if it is the first (and the exception is non null)
+ if (t != null && exception.compareAndSet(null, t) && toInterrupt != null) {
toInterrupt.interrupt();
}
}
-
+
+ /**
+ * Checks whether an exception has been set via {@link #reportError(Throwable)}.
+ * If yes, that exception if re-thrown by this method.
+ *
+ * @throws Exception This method re-throws the exception, if set.
+ */
public void checkAndThrowException() throws Exception {
Throwable t = exception.get();
if (t != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
index bda90bd..fc07247 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
@@ -19,23 +19,36 @@ package org.apache.flink.streaming.connectors.kafka.util;
import java.util.Properties;
+/**
+ * Simple utilities, used by the Flink Kafka Consumers.
+ */
public class KafkaUtils {
public static int getIntFromConfig(Properties config, String key, int defaultValue) {
- try {
- return Integer.parseInt(config.getProperty(key, Integer.toString(defaultValue)));
- } catch(NumberFormatException nfe) {
- throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " +
- "Entered value='" + config.getProperty(key) + "'. Default value='" + defaultValue + "'");
+ String val = config.getProperty(key);
+ if (val == null) {
+ return defaultValue;
+ } else {
+ try {
+ return Integer.parseInt(val);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " +
+ "Entered value='" + val + "'. Default value='" + defaultValue + "'");
+ }
}
}
public static long getLongFromConfig(Properties config, String key, long defaultValue) {
- try {
- return Long.parseLong(config.getProperty(key, Long.toString(defaultValue)));
- } catch(NumberFormatException nfe) {
- throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " +
- "Entered value='" + config.getProperty(key) + "'. Default value='" + defaultValue + "'");
+ String val = config.getProperty(key);
+ if (val == null) {
+ return defaultValue;
+ } else {
+ try {
+ return Long.parseLong(val);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " +
+ "Entered value='" + val + "'. Default value='" + defaultValue + "'");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
new file mode 100644
index 0000000..c073a04
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class AbstractFetcherTimestampsTest {
+
+ @Test
+ public void testPunctuatedWatermarks() throws Exception {
+ List<KafkaTopicPartition> originalPartitions = Arrays.asList(
+ new KafkaTopicPartition("test topic name", 7),
+ new KafkaTopicPartition("test topic name", 13),
+ new KafkaTopicPartition("test topic name", 21));
+
+ TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+ TestFetcher<Long> fetcher = new TestFetcher<>(
+ sourceContext, originalPartitions, null,
+ new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()),
+ new MockRuntimeContext(17, 3));
+
+ final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
+ final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
+ final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
+
+ // elements generate a watermark if the timestamp is a multiple of three
+
+ // elements for partition 1
+ fetcher.emitRecord(1L, part1, 1L);
+ fetcher.emitRecord(2L, part1, 2L);
+ fetcher.emitRecord(3L, part1, 3L);
+ assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+ assertFalse(sourceContext.hasWatermark());
+
+ // elements for partition 2
+ fetcher.emitRecord(12L, part2, 1L);
+ assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+ assertFalse(sourceContext.hasWatermark());
+
+ // elements for partition 3
+ fetcher.emitRecord(101L, part3, 1L);
+ fetcher.emitRecord(102L, part3, 2L);
+ assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+
+ // now, we should have a watermark
+ assertTrue(sourceContext.hasWatermark());
+ assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // advance partition 3
+ fetcher.emitRecord(1003L, part3, 3L);
+ fetcher.emitRecord(1004L, part3, 4L);
+ fetcher.emitRecord(1005L, part3, 5L);
+ assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+
+ // advance partition 1 beyond partition 2 - this bumps the watermark
+ fetcher.emitRecord(30L, part1, 4L);
+ assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+ assertTrue(sourceContext.hasWatermark());
+ assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // advance partition 2 again - this bumps the watermark
+ fetcher.emitRecord(13L, part2, 2L);
+ assertFalse(sourceContext.hasWatermark());
+ fetcher.emitRecord(14L, part2, 3L);
+ assertFalse(sourceContext.hasWatermark());
+ fetcher.emitRecord(15L, part2, 3L);
+ assertTrue(sourceContext.hasWatermark());
+ assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
+ }
+
+ @Test
+ public void testPeriodicWatermarks() throws Exception {
+ ExecutionConfig config = new ExecutionConfig();
+ config.setAutoWatermarkInterval(10);
+
+ List<KafkaTopicPartition> originalPartitions = Arrays.asList(
+ new KafkaTopicPartition("test topic name", 7),
+ new KafkaTopicPartition("test topic name", 13),
+ new KafkaTopicPartition("test topic name", 21));
+
+ TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+ TestFetcher<Long> fetcher = new TestFetcher<>(
+ sourceContext, originalPartitions,
+ new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
+ null, new MockRuntimeContext(17, 3, config, sourceContext.getCheckpointLock()));
+
+ final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
+ final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
+ final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
+
+ // elements generate a watermark if the timestamp is a multiple of three
+
+ // elements for partition 1
+ fetcher.emitRecord(1L, part1, 1L);
+ fetcher.emitRecord(2L, part1, 2L);
+ fetcher.emitRecord(3L, part1, 3L);
+ assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+
+ // elements for partition 2
+ fetcher.emitRecord(12L, part2, 1L);
+ assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+
+ // elements for partition 3
+ fetcher.emitRecord(101L, part3, 1L);
+ fetcher.emitRecord(102L, part3, 2L);
+ assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+
+ // now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
+ assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // advance partition 3
+ fetcher.emitRecord(1003L, part3, 3L);
+ fetcher.emitRecord(1004L, part3, 4L);
+ fetcher.emitRecord(1005L, part3, 5L);
+ assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+
+ // advance partition 1 beyond partition 2 - this bumps the watermark
+ fetcher.emitRecord(30L, part1, 4L);
+ assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+
+ // this blocks until the periodic thread emitted the watermark
+ assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // advance partition 2 again - this bumps the watermark
+ fetcher.emitRecord(13L, part2, 2L);
+ fetcher.emitRecord(14L, part2, 3L);
+ fetcher.emitRecord(15L, part2, 3L);
+
+ // this blocks until the periodic thread emitted the watermark
+ long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
+ assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
+ }
+
+ // ------------------------------------------------------------------------
+ // Test mocks
+ // ------------------------------------------------------------------------
+
+ private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
+
+ protected TestFetcher(
+ SourceContext<T> sourceContext,
+ List<KafkaTopicPartition> assignedPartitions,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+ StreamingRuntimeContext runtimeContext) throws Exception
+ {
+ super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext);
+ }
+
+ @Override
+ public void runFetchLoop() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void cancel() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object createKafkaPartitionHandle(KafkaTopicPartition partition) {
+ return new Object();
+ }
+
+ @Override
+ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class TestSourceContext<T> implements SourceContext<T> {
+
+ private final Object checkpointLock = new Object();
+ private final Object watermarkLock = new Object();
+
+ private volatile StreamRecord<T> latestElement;
+ private volatile Watermark currentWatermark;
+
+ @Override
+ public void collect(T element) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ this.latestElement = new StreamRecord<T>(element, timestamp);
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ synchronized (watermarkLock) {
+ currentWatermark = mark;
+ watermarkLock.notifyAll();
+ }
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return checkpointLock;
+ }
+
+ @Override
+ public void close() {}
+
+ public StreamRecord<T> getLatestElement() {
+ return latestElement;
+ }
+
+ public boolean hasWatermark() {
+ return currentWatermark != null;
+ }
+
+ public Watermark getLatestWatermark() throws InterruptedException {
+ synchronized (watermarkLock) {
+ while (currentWatermark == null) {
+ watermarkLock.wait();
+ }
+ Watermark wm = currentWatermark;
+ currentWatermark = null;
+ return wm;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
+
+ private volatile long maxTimestamp = Long.MIN_VALUE;
+
+ @Override
+ public long extractTimestamp(Long element, long previousElementTimestamp) {
+ maxTimestamp = Math.max(maxTimestamp, element);
+ return element;
+ }
+
+ @Nullable
+ @Override
+ public Watermark getCurrentWatermark() {
+ return new Watermark(maxTimestamp);
+ }
+ }
+
+ private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> {
+
+ @Override
+ public long extractTimestamp(Long element, long previousElementTimestamp) {
+ return element;
+ }
+
+ @Nullable
+ @Override
+ public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
+ return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index e74eee4..3e46503 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -37,24 +37,43 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
@SuppressWarnings("deprecation")
public class MockRuntimeContext extends StreamingRuntimeContext {
private final int numberOfParallelSubtasks;
private final int indexOfThisSubtask;
+
+ private final ExecutionConfig execConfig;
+ private final Object checkpointLock;
+ private ScheduledExecutorService timer;
+
public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
+ this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), null);
+ }
+
+ public MockRuntimeContext(
+ int numberOfParallelSubtasks, int indexOfThisSubtask,
+ ExecutionConfig execConfig,
+ Object checkpointLock) {
super(new MockStreamOperator(),
new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
Collections.<String, Accumulator<?, ?>>emptyMap());
+
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
this.indexOfThisSubtask = indexOfThisSubtask;
+ this.execConfig = execConfig;
+ this.checkpointLock = checkpointLock;
}
@Override
@@ -64,7 +83,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
@Override
public String getTaskName() {
- return null;
+ return "mock task";
}
@Override
@@ -84,7 +103,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
@Override
public ExecutionConfig getExecutionConfig() {
- throw new UnsupportedOperationException();
+ return execConfig;
}
@Override
@@ -167,6 +186,29 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
throw new UnsupportedOperationException();
}
+ @Override
+ public void registerTimer(final long time, final Triggerable target) {
+ if (timer == null) {
+ timer = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ final long delay = Math.max(time - System.currentTimeMillis(), 0);
+
+ timer.schedule(new Runnable() {
+ @Override
+ public void run() {
+ synchronized (checkpointLock) {
+ try {
+ target.trigger(time);
+ } catch (Throwable t) {
+ System.err.println("!!! Caught exception while processing timer. !!!");
+ t.printStackTrace();
+ }
+ }
+ }
+ }, delay, TimeUnit.MILLISECONDS);
+ }
+
// ------------------------------------------------------------------------
private static class MockStreamOperator extends AbstractStreamOperator<Integer> {
[5/5] flink git commit: [FLINK-3745] [runtime] Fix early stopping of
stream sources
Posted by se...@apache.org.
[FLINK-3745] [runtime] Fix early stopping of stream sources
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8570b6dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8570b6dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8570b6dc
Branch: refs/heads/master
Commit: 8570b6dc3ae3c69dc50e81a46835d40df8a03992
Parents: 2728f92
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Apr 13 12:26:42 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 20:50:49 2016 +0200
----------------------------------------------------------------------
.../tasks/StoppableSourceStreamTask.java | 15 +++-
.../tasks/SourceStreamTaskStoppingTest.java | 94 ++++++++++++++++++++
.../runtime/tasks/SourceStreamTaskTest.java | 40 +--------
.../streaming/timestamp/TimestampITCase.java | 18 ++--
.../test/classloading/jar/UserCodeType.java | 1 +
5 files changed, 123 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
index 5173796..7ff39b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
@@ -31,9 +31,20 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource;
public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & StoppableFunction>
extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> implements StoppableTask {
+ private volatile boolean stopped;
+
@Override
- public void stop() {
- this.headOperator.stop();
+ protected void run() throws Exception {
+ if (!stopped) {
+ super.run();
+ }
}
+ @Override
+ public void stop() {
+ stopped = true;
+ if (this.headOperator != null) {
+ this.headOperator.stop();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
new file mode 100644
index 0000000..ab9e59b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StoppableStreamSource;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * These tests verify that the RichFunction methods are called (in correct order). And that
+ * checkpointing/element emission don't occur concurrently.
+ */
+public class SourceStreamTaskStoppingTest {
+
+
+ // test flag for testStop()
+ static boolean stopped = false;
+
+ @Test
+ public void testStop() {
+ final StoppableSourceStreamTask<Object, StoppableSource> sourceTask = new StoppableSourceStreamTask<>();
+ sourceTask.headOperator = new StoppableStreamSource<>(new StoppableSource());
+
+ sourceTask.stop();
+
+ assertTrue(stopped);
+ }
+
+ @Test
+ public void testStopBeforeInitialization() throws Exception {
+
+ final StoppableSourceStreamTask<Object, StoppableFailingSource> sourceTask = new StoppableSourceStreamTask<>();
+ sourceTask.stop();
+
+ sourceTask.headOperator = new StoppableStreamSource<>(new StoppableFailingSource());
+ sourceTask.run();
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class StoppableSource extends RichSourceFunction<Object> implements StoppableFunction {
+ private static final long serialVersionUID = 728864804042338806L;
+
+ @Override
+ public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Object> ctx)
+ throws Exception {
+ }
+
+ @Override
+ public void cancel() {}
+
+ @Override
+ public void stop() {
+ stopped = true;
+ }
+ }
+
+ private static class StoppableFailingSource extends RichSourceFunction<Object> implements StoppableFunction {
+ private static final long serialVersionUID = 728864804042338806L;
+
+ @Override
+ public void run(SourceContext<Object> ctx) throws Exception {
+ fail("should not be called");
+ }
+
+ @Override
+ public void cancel() {}
+
+ @Override
+ public void stop() {}
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index bfb2d34..cb779b0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.runtime.tasks;
-import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -28,12 +27,13 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StoppableStreamSource;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.util.TestHarnessUtil;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
+
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -77,19 +77,6 @@ public class SourceStreamTaskTest {
Assert.assertEquals(10, resultElements.size());
}
- // test flag for testStop()
- static boolean stopped = false;
-
- @Test
- public void testStop() {
- final StoppableSourceStreamTask<Object, StoppableSource> sourceTask = new StoppableSourceStreamTask<>();
- sourceTask.headOperator = new StoppableStreamSource<>(new StoppableSource());
-
- sourceTask.stop();
-
- Assert.assertTrue(stopped);
- }
-
/**
* This test ensures that the SourceStreamTask properly serializes checkpointing
* and element emission. This also verifies that there are no concurrent invocations
@@ -155,24 +142,7 @@ public class SourceStreamTaskTest {
}
}
- private static class StoppableSource extends RichSourceFunction<Object> implements StoppableFunction {
- private static final long serialVersionUID = 728864804042338806L;
-
- @Override
- public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Object> ctx)
- throws Exception {
- }
-
- @Override
- public void cancel() {}
-
- @Override
- public void stop() {
- stopped = true;
- }
- }
-
- private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed {
+ private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> {
private static final long serialVersionUID = 1;
private int maxElements;
@@ -240,9 +210,7 @@ public class SourceStreamTaskTest {
}
@Override
- public void restoreState(Serializable state) {
-
- }
+ public void restoreState(Serializable state) {}
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 1a59ab3..d857672 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -203,7 +203,7 @@ public class TimestampITCase {
// try until we get the running jobs
List<JobID> running;
while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
- Thread.sleep(100);
+ Thread.sleep(50);
}
JobID id = running.get(0);
@@ -223,22 +223,26 @@ public class TimestampITCase {
env.execute();
// verify that all the watermarks arrived at the final custom operator
- for (int i = 0; i < PARALLELISM; i++) {
+ for (List<Watermark> subtaskWatermarks : CustomOperator.finalWatermarks) {
+
// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the
// other source stops emitting after that
- for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
- if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
+ for (int j = 0; j < subtaskWatermarks.size(); j++) {
+ if (subtaskWatermarks.get(j).getTimestamp() != initialTime + j) {
System.err.println("All Watermarks: ");
for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
- System.err.println(CustomOperator.finalWatermarks[i].get(k));
+ System.err.println(subtaskWatermarks.get(k));
}
fail("Wrong watermark.");
}
}
- assertNotEquals(Watermark.MAX_WATERMARK,
- CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1));
+ // if there are watermarks, the final one must not be the MAX watermark
+ if (subtaskWatermarks.size() > 0) {
+ assertNotEquals(Watermark.MAX_WATERMARK,
+ subtaskWatermarks.get(subtaskWatermarks.size()-1));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
index 333c01a..a073cba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
@@ -48,6 +48,7 @@ public class UserCodeType {
int port = Integer.parseInt(args[2]);
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+ env.getConfig().disableSysoutLogging();
DataSet<Integer> input = env.fromElements(1,2,3,4,5);