You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/04/13 20:51:24 UTC

[1/5] flink git commit: [hotfix] [kafka consumer] Increase Kafka test stability

Repository: flink
Updated Branches:
  refs/heads/master b0a7a1b81 -> 1a34f2165


[hotfix] [kafka consumer] Increase Kafka test stability


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2728f924
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2728f924
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2728f924

Branch: refs/heads/master
Commit: 2728f924cf64cd62929b8f0e394a1d4335af8156
Parents: b0a7a1b
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Apr 13 10:38:37 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 20:50:48 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerTestBase.java        | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2728f924/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 8ff67b4..a65a411 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1278,13 +1278,24 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 							new Tuple2Partitioner(parallelism)))
 					.setParallelism(parallelism);
 
-			writeEnv.execute("Write sequence");
+			try {
+				writeEnv.execute("Write sequence");
+			}
+			catch (Exception e) {
+				LOG.error("Write attempt failed, trying again", e);
+				deleteTestTopic(topicName);
+				JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+				continue;
+			}
+			
 			LOG.info("Finished writing sequence");
 
 			// -------- Validate the Sequence --------
 			
 			// we need to validate the sequence, because kafka's producers are not exactly once
 			LOG.info("Validating sequence");
+
+			JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
 			
 			final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 			readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());


[4/5] flink git commit: [FLINK-3743] upgrading breeze from 0.11.2 to 0.12

Posted by se...@apache.org.
[FLINK-3743] upgrading breeze from 0.11.2 to 0.12

This closes #1876


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a34f216
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a34f216
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a34f216

Branch: refs/heads/master
Commit: 1a34f2165b217a4579df410dd7ccb879d6a58083
Parents: 2dcd27f
Author: Todd Lisonbee <to...@intel.com>
Authored: Mon Apr 11 23:20:43 2016 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 20:50:49 2016 +0200

----------------------------------------------------------------------
 flink-libraries/flink-ml/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a34f216/flink-libraries/flink-ml/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/pom.xml b/flink-libraries/flink-ml/pom.xml
index 986cb0f..0a3531e 100644
--- a/flink-libraries/flink-ml/pom.xml
+++ b/flink-libraries/flink-ml/pom.xml
@@ -44,7 +44,7 @@
 		<dependency>
 			<groupId>org.scalanlp</groupId>
 			<artifactId>breeze_${scala.binary.version}</artifactId>
-			<version>0.11.2</version>
+			<version>0.12</version>
 		</dependency>
 
 		<dependency>


[2/5] flink git commit: [FLINK-3747] Consolidate TimestampAssigner Methods in Kafka Consumer

Posted by se...@apache.org.
[FLINK-3747] Consolidate TimestampAssigner Methods in Kafka Consumer

This closes #1877


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e40e29da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e40e29da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e40e29da

Branch: refs/heads/master
Commit: e40e29da9b68ec6da59f7b5372cb1483283c0530
Parents: 8570b6d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Apr 13 11:41:39 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 20:50:49 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumerBase.java        |  4 ++--
 .../connectors/kafka/FlinkKafkaConsumerBaseTest.java    | 12 ++++++------
 .../connectors/kafka/KafkaConsumerTestBase.java         |  2 +-
 3 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 0ca8fd5..ed5c72f 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -148,7 +148,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 * @param assigner The timestamp assigner / watermark generator to use.
 	 * @return The consumer object, to allow function chaining.   
 	 */
-	public FlinkKafkaConsumerBase<T> setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks<T> assigner) {
+	public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
 		checkNotNull(assigner);
 		
 		if (this.periodicWatermarkAssigner != null) {
@@ -182,7 +182,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 * @param assigner The timestamp assigner / watermark generator to use.
 	 * @return The consumer object, to allow function chaining.   
 	 */
-	public FlinkKafkaConsumerBase<T> setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks<T> assigner) {
+	public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
 		checkNotNull(assigner);
 		
 		if (this.punctuatedWatermarkAssigner != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index f4ef995..9b517df 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -44,12 +44,12 @@ public class FlinkKafkaConsumerBaseTest {
 	@Test
 	public void testEitherWatermarkExtractor() {
 		try {
-			new DummyFlinkKafkaConsumer<>().setPeriodicWatermarkEmitter(null);
+			new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null);
 			fail();
 		} catch (NullPointerException ignored) {}
 
 		try {
-			new DummyFlinkKafkaConsumer<>().setPunctuatedWatermarkEmitter(null);
+			new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null);
 			fail();
 		} catch (NullPointerException ignored) {}
 		
@@ -59,16 +59,16 @@ public class FlinkKafkaConsumerBaseTest {
 		final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
 		
 		DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>();
-		c1.setPeriodicWatermarkEmitter(periodicAssigner);
+		c1.assignTimestampsAndWatermarks(periodicAssigner);
 		try {
-			c1.setPunctuatedWatermarkEmitter(punctuatedAssigner);
+			c1.assignTimestampsAndWatermarks(punctuatedAssigner);
 			fail();
 		} catch (IllegalStateException ignored) {}
 
 		DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>();
-		c2.setPunctuatedWatermarkEmitter(punctuatedAssigner);
+		c2.assignTimestampsAndWatermarks(punctuatedAssigner);
 		try {
-			c2.setPeriodicWatermarkEmitter(periodicAssigner);
+			c2.assignTimestampsAndWatermarks(periodicAssigner);
 			fail();
 		} catch (IllegalStateException ignored) {}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index a65a411..cc9205c 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1662,7 +1662,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = kafkaServer
 			.getConsumer(topics, sourceSchema, standardProps)
-			.setPunctuatedWatermarkEmitter(new TestPunctuatedTSExtractor());
+			.assignTimestampsAndWatermarks(new TestPunctuatedTSExtractor());
 
 		DataStreamSource<Tuple2<Long, Integer>> consuming = env.setParallelism(1).addSource(source);
 


[3/5] flink git commit: [FLINK-3375] [Kafka Connector] Add tests for per-kafka-partition watermarks.

Posted by se...@apache.org.
[FLINK-3375] [Kafka Connector] Add tests for per-kafka-partition watermarks.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2dcd27f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2dcd27f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2dcd27f4

Branch: refs/heads/master
Commit: 2dcd27f403c2a7f10791bfe21c45e2a326aa46a1
Parents: e40e29d
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Apr 13 15:45:51 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 20:50:49 2016 +0200

----------------------------------------------------------------------
 .../kafka/internals/AbstractFetcher.java        |  77 +++--
 .../kafka/internals/ExceptionProxy.java         |  60 +++-
 .../connectors/kafka/util/KafkaUtils.java       |  33 +-
 .../AbstractFetcherTimestampsTest.java          | 306 +++++++++++++++++++
 .../kafka/testutils/MockRuntimeContext.java     |  46 ++-
 5 files changed, 478 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 594aa66..8183575 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -328,45 +328,66 @@ public abstract class AbstractFetcher<T, KPH> {
 			ClassLoader userCodeClassLoader)
 		throws IOException, ClassNotFoundException
 	{
-		@SuppressWarnings("unchecked")
-		KafkaTopicPartitionState<KPH>[] partitions =
-				(KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()];
-
-		int pos = 0;
-		for (KafkaTopicPartition partition : assignedPartitions) {
-			// create the kafka version specific partition handle
-			KPH kafkaHandle = createKafkaPartitionHandle(partition);
+		switch (timestampWatermarkMode) {
 			
-			// create the partition state
-			KafkaTopicPartitionState<KPH> partitionState;
-			switch (timestampWatermarkMode) {
-				case NO_TIMESTAMPS_WATERMARKS:
-					partitionState = new KafkaTopicPartitionState<>(partition, kafkaHandle);
-					break;
-				case PERIODIC_WATERMARKS: {
+			case NO_TIMESTAMPS_WATERMARKS: {
+				@SuppressWarnings("unchecked")
+				KafkaTopicPartitionState<KPH>[] partitions =
+						(KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()];
+
+				int pos = 0;
+				for (KafkaTopicPartition partition : assignedPartitions) {
+					// create the kafka version specific partition handle
+					KPH kafkaHandle = createKafkaPartitionHandle(partition);
+					partitions[pos++] = new KafkaTopicPartitionState<>(partition, kafkaHandle);
+				}
+
+				return partitions;
+			}
+
+			case PERIODIC_WATERMARKS: {
+				@SuppressWarnings("unchecked")
+				KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
+						(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
+								new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];
+
+				int pos = 0;
+				for (KafkaTopicPartition partition : assignedPartitions) {
+					KPH kafkaHandle = createKafkaPartitionHandle(partition);
+
 					AssignerWithPeriodicWatermarks<T> assignerInstance =
 							watermarksPeriodic.deserializeValue(userCodeClassLoader);
-					partitionState = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+					
+					partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
 							partition, kafkaHandle, assignerInstance);
-					break;
 				}
-					
-				case PUNCTUATED_WATERMARKS: {
+
+				return partitions;
+			}
+
+			case PUNCTUATED_WATERMARKS: {
+				@SuppressWarnings("unchecked")
+				KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
+						(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
+								new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitions.size()];
+
+				int pos = 0;
+				for (KafkaTopicPartition partition : assignedPartitions) {
+					KPH kafkaHandle = createKafkaPartitionHandle(partition);
+
 					AssignerWithPunctuatedWatermarks<T> assignerInstance =
 							watermarksPunctuated.deserializeValue(userCodeClassLoader);
-					partitionState = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+
+					partitions[pos++] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
 							partition, kafkaHandle, assignerInstance);
-					break;
 				}
-				default:
-					// cannot happen, add this as a guard for the future
-					throw new RuntimeException();
-			}
 
-			partitions[pos++] = partitionState;
+				return partitions;
+			}
+			default:
+				// cannot happen, add this as a guard for the future
+				throw new RuntimeException();
 		}
-		
-		return partitions;
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
index 9a0e4e3..c736493 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
@@ -22,7 +22,48 @@ import javax.annotation.Nullable;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
+ * A proxy that communicates exceptions between threads. Typically used if an exception
+ * from a spawned thread needs to be recognized by the "parent" (spawner) thread.
  * 
+ * <p>The spawned thread would set the exception via {@link #reportError(Throwable)}.
+ * The parent would check (at certain points) for exceptions via {@link #checkAndThrowException()}.
+ * Optionally, the parent can pass itself in the constructor to be interrupted as soon as
+ * an exception occurs.
+ * 
+ * <pre>
+ * {@code
+ * 
+ * final ExceptionProxy errorProxy = new ExceptionProxy(Thread.currentThread());
+ * 
+ * Thread subThread = new Thread() {
+ * 
+ *     public void run() {
+ *         try {
+ *             doSomething();
+ *         } catch (Throwable t) {
+ *             errorProxy.reportError(
+ *         } finally {
+ *             doSomeCleanup();
+ *         }
+ *     }
+ * };
+ * subThread.start();
+ * 
+ * doSomethingElse();
+ * errorProxy.checkAndThrowException();
+ * 
+ * doSomethingMore();
+ * errorProxy.checkAndThrowException();
+ * 
+ * try {
+ *     subThread.join();
+ * } catch (InterruptedException e) {
+ *     errorProxy.checkAndThrowException();
+ *     // restore interrupted status, if not caused by an exception
+ *     Thread.currentThread().interrupt();
+ * }
+ * }
+ * </pre>
  */
 public class ExceptionProxy {
 	
@@ -33,6 +74,8 @@ public class ExceptionProxy {
 	private final AtomicReference<Throwable> exception;
 
 	/**
+	 * Creates an exception proxy that interrupts the given thread upon
+	 * report of an exception. The thread to interrupt may be null.
 	 * 
 	 * @param toInterrupt The thread to interrupt upon an exception. May be null.
 	 */
@@ -44,18 +87,27 @@ public class ExceptionProxy {
 	// ------------------------------------------------------------------------
 	
 	/**
-	 * Sets the exception occurred and interrupts the target thread,
+	 * Sets the exception and interrupts the target thread,
 	 * if no other exception has occurred so far.
 	 * 
+	 * <p>The exception is only set (and the interruption is only triggered),
+	 * if no other exception was set before.
+	 * 
 	 * @param t The exception that occurred
 	 */
 	public void reportError(Throwable t) {
-		// set the exception, if it is the first
-		if (exception.compareAndSet(null, t) && toInterrupt != null) {
+		// set the exception, if it is the first (and the exception is non null)
+		if (t != null && exception.compareAndSet(null, t) && toInterrupt != null) {
 			toInterrupt.interrupt();
 		}
 	}
-	
+
+	/**
+	 * Checks whether an exception has been set via {@link #reportError(Throwable)}.
+	 * If yes, that exception if re-thrown by this method.
+	 * 
+	 * @throws Exception This method re-throws the exception, if set.
+	 */
 	public void checkAndThrowException() throws Exception {
 		Throwable t = exception.get();
 		if (t != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
index bda90bd..fc07247 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
@@ -19,23 +19,36 @@ package org.apache.flink.streaming.connectors.kafka.util;
 
 import java.util.Properties;
 
+/**
+ * Simple utilities, used by the Flink Kafka Consumers.
+ */
 public class KafkaUtils {
 
 	public static int getIntFromConfig(Properties config, String key, int defaultValue) {
-		try {
-			return Integer.parseInt(config.getProperty(key, Integer.toString(defaultValue)));
-		} catch(NumberFormatException nfe) {
-			throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " +
-					"Entered value='" + config.getProperty(key) + "'. Default value='" + defaultValue + "'");
+		String val = config.getProperty(key);
+		if (val == null) {
+			return defaultValue;
+		} else {
+			try {
+				return Integer.parseInt(val);
+			} catch (NumberFormatException nfe) {
+				throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " +
+						"Entered value='" + val + "'. Default value='" + defaultValue + "'");
+			}
 		}
 	}
 
 	public static long getLongFromConfig(Properties config, String key, long defaultValue) {
-		try {
-			return Long.parseLong(config.getProperty(key, Long.toString(defaultValue)));
-		} catch(NumberFormatException nfe) {
-			throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " +
-					"Entered value='" + config.getProperty(key) + "'. Default value='" + defaultValue + "'");
+		String val = config.getProperty(key);
+		if (val == null) {
+			return defaultValue;
+		} else {
+			try {
+				return Long.parseLong(val);
+			} catch (NumberFormatException nfe) {
+				throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " +
+						"Entered value='" + val + "'. Default value='" + defaultValue + "'");
+			}
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
new file mode 100644
index 0000000..c073a04
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class AbstractFetcherTimestampsTest {
+	
+	@Test
+	public void testPunctuatedWatermarks() throws Exception {
+		List<KafkaTopicPartition> originalPartitions = Arrays.asList(
+				new KafkaTopicPartition("test topic name", 7),
+				new KafkaTopicPartition("test topic name", 13),
+				new KafkaTopicPartition("test topic name", 21));
+
+		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+		TestFetcher<Long> fetcher = new TestFetcher<>(
+				sourceContext, originalPartitions, null,
+				new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()),
+				new MockRuntimeContext(17, 3));
+
+		final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
+		final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
+		final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
+
+		// elements generate a watermark if the timestamp is a multiple of three
+		
+		// elements for partition 1
+		fetcher.emitRecord(1L, part1, 1L);
+		fetcher.emitRecord(2L, part1, 2L);
+		fetcher.emitRecord(3L, part1, 3L);
+		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+		assertFalse(sourceContext.hasWatermark());
+
+		// elements for partition 2
+		fetcher.emitRecord(12L, part2, 1L);
+		assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+		assertFalse(sourceContext.hasWatermark());
+
+		// elements for partition 3
+		fetcher.emitRecord(101L, part3, 1L);
+		fetcher.emitRecord(102L, part3, 2L);
+		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+		
+		// now, we should have a watermark
+		assertTrue(sourceContext.hasWatermark());
+		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+		
+		// advance partition 3
+		fetcher.emitRecord(1003L, part3, 3L);
+		fetcher.emitRecord(1004L, part3, 4L);
+		fetcher.emitRecord(1005L, part3, 5L);
+		assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+
+		// advance partition 1 beyond partition 2 - this bumps the watermark
+		fetcher.emitRecord(30L, part1, 4L);
+		assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+		assertTrue(sourceContext.hasWatermark());
+		assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+
+		// advance partition 2 again - this bumps the watermark
+		fetcher.emitRecord(13L, part2, 2L);
+		assertFalse(sourceContext.hasWatermark());
+		fetcher.emitRecord(14L, part2, 3L);
+		assertFalse(sourceContext.hasWatermark());
+		fetcher.emitRecord(15L, part2, 3L);
+		assertTrue(sourceContext.hasWatermark());
+		assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
+	}
+	
+	@Test
+	public void testPeriodicWatermarks() throws Exception {
+		ExecutionConfig config = new ExecutionConfig();
+		config.setAutoWatermarkInterval(10);
+		
+		List<KafkaTopicPartition> originalPartitions = Arrays.asList(
+				new KafkaTopicPartition("test topic name", 7),
+				new KafkaTopicPartition("test topic name", 13),
+				new KafkaTopicPartition("test topic name", 21));
+
+		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+		TestFetcher<Long> fetcher = new TestFetcher<>(
+				sourceContext, originalPartitions,
+				new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
+				null, new MockRuntimeContext(17, 3, config, sourceContext.getCheckpointLock()));
+
+		final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
+		final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
+		final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
+
+		// elements generate a watermark if the timestamp is a multiple of three
+
+		// elements for partition 1
+		fetcher.emitRecord(1L, part1, 1L);
+		fetcher.emitRecord(2L, part1, 2L);
+		fetcher.emitRecord(3L, part1, 3L);
+		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+
+		// elements for partition 2
+		fetcher.emitRecord(12L, part2, 1L);
+		assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+
+		// elements for partition 3
+		fetcher.emitRecord(101L, part3, 1L);
+		fetcher.emitRecord(102L, part3, 2L);
+		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+
+		// now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
+		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+
+		// advance partition 3
+		fetcher.emitRecord(1003L, part3, 3L);
+		fetcher.emitRecord(1004L, part3, 4L);
+		fetcher.emitRecord(1005L, part3, 5L);
+		assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+
+		// advance partition 1 beyond partition 2 - this bumps the watermark
+		fetcher.emitRecord(30L, part1, 4L);
+		assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+		
+		// this blocks until the periodic thread emitted the watermark
+		assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+
+		// advance partition 2 again - this bumps the watermark
+		fetcher.emitRecord(13L, part2, 2L);
+		fetcher.emitRecord(14L, part2, 3L);
+		fetcher.emitRecord(15L, part2, 3L);
+
+		// this blocks until the periodic thread emitted the watermark
+		long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
+		assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Test mocks
+	// ------------------------------------------------------------------------
+
+	private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
+
+		protected TestFetcher(
+				SourceContext<T> sourceContext,
+				List<KafkaTopicPartition> assignedPartitions,
+				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+				StreamingRuntimeContext runtimeContext) throws Exception
+		{
+			super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext);
+		}
+
+		@Override
+		public void runFetchLoop() throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void cancel() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Object createKafkaPartitionHandle(KafkaTopicPartition partition) {
+			return new Object();
+		}
+
+		@Override
+		public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TestSourceContext<T> implements SourceContext<T> {
+
+		private final Object checkpointLock = new Object();
+		private final Object watermarkLock = new Object();
+
+		private volatile StreamRecord<T> latestElement;
+		private volatile Watermark currentWatermark;
+
+		@Override
+		public void collect(T element) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			this.latestElement = new StreamRecord<T>(element, timestamp);
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			synchronized (watermarkLock) {
+				currentWatermark = mark;
+				watermarkLock.notifyAll();
+			}
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return checkpointLock;
+		}
+
+		@Override
+		public void close() {}
+
+		public StreamRecord<T> getLatestElement() {
+			return latestElement;
+		}
+
+		public boolean hasWatermark() {
+			return currentWatermark != null;
+		}
+		
+		public Watermark getLatestWatermark() throws InterruptedException {
+			synchronized (watermarkLock) {
+				while (currentWatermark == null) {
+					watermarkLock.wait();
+				}
+				Watermark wm = currentWatermark;
+				currentWatermark = null;
+				return wm;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
+
+		private volatile long maxTimestamp = Long.MIN_VALUE;
+		
+		@Override
+		public long extractTimestamp(Long element, long previousElementTimestamp) {
+			maxTimestamp = Math.max(maxTimestamp, element);
+			return element;
+		}
+
+		@Nullable
+		@Override
+		public Watermark getCurrentWatermark() {
+			return new Watermark(maxTimestamp);
+		}
+	}
+
+	private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> {
+
+		@Override
+		public long extractTimestamp(Long element, long previousElementTimestamp) {
+			return element;
+		}
+
+		@Nullable
+		@Override
+		public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
+			return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null;
+		}
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index e74eee4..3e46503 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -37,24 +37,43 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 @SuppressWarnings("deprecation")
 public class MockRuntimeContext extends StreamingRuntimeContext {
 
 	private final int numberOfParallelSubtasks;
 	private final int indexOfThisSubtask;
+	
+	private final ExecutionConfig execConfig;
+	private final Object checkpointLock;
 
+	private ScheduledExecutorService timer;
+	
 	public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
+		this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), null);
+	}
+	
+	public MockRuntimeContext(
+			int numberOfParallelSubtasks, int indexOfThisSubtask, 
+			ExecutionConfig execConfig,
+			Object checkpointLock) {
 		super(new MockStreamOperator(),
 				new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
 				Collections.<String, Accumulator<?, ?>>emptyMap());
+		
 		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
 		this.indexOfThisSubtask = indexOfThisSubtask;
+		this.execConfig = execConfig;
+		this.checkpointLock = checkpointLock;
 	}
 
 	@Override
@@ -64,7 +83,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 
 	@Override
 	public String getTaskName() {
-		return null;
+		return "mock task";
 	}
 
 	@Override
@@ -84,7 +103,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 
 	@Override
 	public ExecutionConfig getExecutionConfig() {
-		throw new UnsupportedOperationException();
+		return execConfig;
 	}
 
 	@Override
@@ -167,6 +186,29 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 		throw new UnsupportedOperationException();
 	}
 	
+	@Override
+	public void registerTimer(final long time, final Triggerable target) {
+		if (timer == null) {
+			timer = Executors.newSingleThreadScheduledExecutor();
+		}
+		
+		final long delay = Math.max(time - System.currentTimeMillis(), 0);
+
+		timer.schedule(new Runnable() {
+			@Override
+			public void run() {
+				synchronized (checkpointLock) {
+					try {
+						target.trigger(time);
+					} catch (Throwable t) {
+						System.err.println("!!! Caught exception while processing timer. !!!");
+						t.printStackTrace();
+					}
+				}
+			}
+		}, delay, TimeUnit.MILLISECONDS);
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static class MockStreamOperator extends AbstractStreamOperator<Integer> {


[5/5] flink git commit: [FLINK-3745] [runtime] Fix early stopping of stream sources

Posted by se...@apache.org.
[FLINK-3745] [runtime] Fix early stopping of stream sources


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8570b6dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8570b6dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8570b6dc

Branch: refs/heads/master
Commit: 8570b6dc3ae3c69dc50e81a46835d40df8a03992
Parents: 2728f92
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Apr 13 12:26:42 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 20:50:49 2016 +0200

----------------------------------------------------------------------
 .../tasks/StoppableSourceStreamTask.java        | 15 +++-
 .../tasks/SourceStreamTaskStoppingTest.java     | 94 ++++++++++++++++++++
 .../runtime/tasks/SourceStreamTaskTest.java     | 40 +--------
 .../streaming/timestamp/TimestampITCase.java    | 18 ++--
 .../test/classloading/jar/UserCodeType.java     |  1 +
 5 files changed, 123 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
index 5173796..7ff39b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
@@ -31,9 +31,20 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & StoppableFunction>
 	extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> implements StoppableTask {
 
+	private volatile boolean stopped;
+
 	@Override
-	public void stop() {
-		this.headOperator.stop();
+	protected void run() throws Exception {
+		if (!stopped) {
+			super.run();
+		}
 	}
 
+	@Override
+	public void stop() {
+		stopped = true;
+		if (this.headOperator != null) {
+			this.headOperator.stop();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
new file mode 100644
index 0000000..ab9e59b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StoppableStreamSource;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * These tests verify that the RichFunction methods are called (in correct order). And that
+ * checkpointing/element emission don't occur concurrently.
+ */
+public class SourceStreamTaskStoppingTest {
+
+
+	// test flag for testStop()
+	static boolean stopped = false;
+
+	@Test
+	public void testStop() {
+		final StoppableSourceStreamTask<Object, StoppableSource> sourceTask = new StoppableSourceStreamTask<>();
+		sourceTask.headOperator = new StoppableStreamSource<>(new StoppableSource());
+
+		sourceTask.stop();
+
+		assertTrue(stopped);
+	}
+
+	@Test
+	public void testStopBeforeInitialization() throws Exception {
+
+		final StoppableSourceStreamTask<Object, StoppableFailingSource> sourceTask = new StoppableSourceStreamTask<>();
+		sourceTask.stop();
+		
+		sourceTask.headOperator = new StoppableStreamSource<>(new StoppableFailingSource());
+		sourceTask.run();
+	}
+	
+	// ------------------------------------------------------------------------
+
+	private static class StoppableSource extends RichSourceFunction<Object> implements StoppableFunction {
+		private static final long serialVersionUID = 728864804042338806L;
+
+		@Override
+		public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Object> ctx)
+				throws Exception {
+		}
+
+		@Override
+		public void cancel() {}
+
+		@Override
+		public void stop() {
+			stopped = true;
+		}
+	}
+
+	private static class StoppableFailingSource extends RichSourceFunction<Object> implements StoppableFunction {
+		private static final long serialVersionUID = 728864804042338806L;
+
+		@Override
+		public void run(SourceContext<Object> ctx) throws Exception {
+			fail("should not be called");
+		}
+
+		@Override
+		public void cancel() {}
+
+		@Override
+		public void stop() {}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index bfb2d34..cb779b0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -28,12 +27,13 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -77,19 +77,6 @@ public class SourceStreamTaskTest {
 		Assert.assertEquals(10, resultElements.size());
 	}
 
-	// test flag for testStop()
-	static boolean stopped = false;
-
-	@Test
-	public void testStop() {
-		final StoppableSourceStreamTask<Object, StoppableSource> sourceTask = new StoppableSourceStreamTask<>();
-		sourceTask.headOperator = new StoppableStreamSource<>(new StoppableSource());
-
-		sourceTask.stop();
-
-		Assert.assertTrue(stopped);
-	}
-
 	/**
 	 * This test ensures that the SourceStreamTask properly serializes checkpointing
 	 * and element emission. This also verifies that there are no concurrent invocations
@@ -155,24 +142,7 @@ public class SourceStreamTaskTest {
 		}
 	}
 
-	private static class StoppableSource extends RichSourceFunction<Object> implements StoppableFunction {
-		private static final long serialVersionUID = 728864804042338806L;
-
-		@Override
-		public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Object> ctx)
-				throws Exception {
-		}
-
-		@Override
-		public void cancel() {}
-
-		@Override
-		public void stop() {
-			stopped = true;
-		}
-	}
-
-	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed {
+	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> {
 		private static final long serialVersionUID = 1;
 
 		private int maxElements;
@@ -240,9 +210,7 @@ public class SourceStreamTaskTest {
 		}
 
 		@Override
-		public void restoreState(Serializable state) {
-
-		}
+		public void restoreState(Serializable state) {}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 1a59ab3..d857672 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -203,7 +203,7 @@ public class TimestampITCase {
 					// try until we get the running jobs
 					List<JobID> running;
 					while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
-						Thread.sleep(100);
+						Thread.sleep(50);
 					}
 
 					JobID id = running.get(0);
@@ -223,22 +223,26 @@ public class TimestampITCase {
 		env.execute();
 
 		// verify that all the watermarks arrived at the final custom operator
-		for (int i = 0; i < PARALLELISM; i++) {
+		for (List<Watermark> subtaskWatermarks : CustomOperator.finalWatermarks) {
+			
 			// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the
 			// other source stops emitting after that
-			for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
-				if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
+			for (int j = 0; j < subtaskWatermarks.size(); j++) {
+				if (subtaskWatermarks.get(j).getTimestamp() != initialTime + j) {
 					System.err.println("All Watermarks: ");
 					for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
-						System.err.println(CustomOperator.finalWatermarks[i].get(k));
+						System.err.println(subtaskWatermarks.get(k));
 					}
 
 					fail("Wrong watermark.");
 				}
 			}
 			
-			assertNotEquals(Watermark.MAX_WATERMARK,
-					CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1));
+			// if there are watermarks, the final one must not be the MAX watermark
+			if (subtaskWatermarks.size() > 0) {
+				assertNotEquals(Watermark.MAX_WATERMARK,
+						subtaskWatermarks.get(subtaskWatermarks.size()-1));
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
index 333c01a..a073cba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
@@ -48,6 +48,7 @@ public class UserCodeType {
 		int port = Integer.parseInt(args[2]);
 
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+		env.getConfig().disableSysoutLogging();
 
 		DataSet<Integer> input = env.fromElements(1,2,3,4,5);