You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/04/13 20:51:28 UTC

[5/5] flink git commit: [FLINK-3745] [runtime] Fix early stopping of stream sources

[FLINK-3745] [runtime] Fix early stopping of stream sources


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8570b6dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8570b6dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8570b6dc

Branch: refs/heads/master
Commit: 8570b6dc3ae3c69dc50e81a46835d40df8a03992
Parents: 2728f92
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Apr 13 12:26:42 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 20:50:49 2016 +0200

----------------------------------------------------------------------
 .../tasks/StoppableSourceStreamTask.java        | 15 +++-
 .../tasks/SourceStreamTaskStoppingTest.java     | 94 ++++++++++++++++++++
 .../runtime/tasks/SourceStreamTaskTest.java     | 40 +--------
 .../streaming/timestamp/TimestampITCase.java    | 18 ++--
 .../test/classloading/jar/UserCodeType.java     |  1 +
 5 files changed, 123 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
index 5173796..7ff39b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
@@ -31,9 +31,20 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & StoppableFunction>
 	extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> implements StoppableTask {
 
+	private volatile boolean stopped;
+
 	@Override
-	public void stop() {
-		this.headOperator.stop();
+	protected void run() throws Exception {
+		if (!stopped) {
+			super.run();
+		}
 	}
 
+	@Override
+	public void stop() {
+		stopped = true;
+		if (this.headOperator != null) {
+			this.headOperator.stop();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
new file mode 100644
index 0000000..ab9e59b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StoppableStreamSource;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * These tests verify that the RichFunction methods are called (in correct order). And that
+ * checkpointing/element emission don't occur concurrently.
+ */
+public class SourceStreamTaskStoppingTest {
+
+
+	// test flag for testStop()
+	static boolean stopped = false;
+
+	@Test
+	public void testStop() {
+		final StoppableSourceStreamTask<Object, StoppableSource> sourceTask = new StoppableSourceStreamTask<>();
+		sourceTask.headOperator = new StoppableStreamSource<>(new StoppableSource());
+
+		sourceTask.stop();
+
+		assertTrue(stopped);
+	}
+
+	@Test
+	public void testStopBeforeInitialization() throws Exception {
+
+		final StoppableSourceStreamTask<Object, StoppableFailingSource> sourceTask = new StoppableSourceStreamTask<>();
+		sourceTask.stop();
+		
+		sourceTask.headOperator = new StoppableStreamSource<>(new StoppableFailingSource());
+		sourceTask.run();
+	}
+	
+	// ------------------------------------------------------------------------
+
+	private static class StoppableSource extends RichSourceFunction<Object> implements StoppableFunction {
+		private static final long serialVersionUID = 728864804042338806L;
+
+		@Override
+		public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Object> ctx)
+				throws Exception {
+		}
+
+		@Override
+		public void cancel() {}
+
+		@Override
+		public void stop() {
+			stopped = true;
+		}
+	}
+
+	private static class StoppableFailingSource extends RichSourceFunction<Object> implements StoppableFunction {
+		private static final long serialVersionUID = 728864804042338806L;
+
+		@Override
+		public void run(SourceContext<Object> ctx) throws Exception {
+			fail("should not be called");
+		}
+
+		@Override
+		public void cancel() {}
+
+		@Override
+		public void stop() {}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index bfb2d34..cb779b0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -28,12 +27,13 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -77,19 +77,6 @@ public class SourceStreamTaskTest {
 		Assert.assertEquals(10, resultElements.size());
 	}
 
-	// test flag for testStop()
-	static boolean stopped = false;
-
-	@Test
-	public void testStop() {
-		final StoppableSourceStreamTask<Object, StoppableSource> sourceTask = new StoppableSourceStreamTask<>();
-		sourceTask.headOperator = new StoppableStreamSource<>(new StoppableSource());
-
-		sourceTask.stop();
-
-		Assert.assertTrue(stopped);
-	}
-
 	/**
 	 * This test ensures that the SourceStreamTask properly serializes checkpointing
 	 * and element emission. This also verifies that there are no concurrent invocations
@@ -155,24 +142,7 @@ public class SourceStreamTaskTest {
 		}
 	}
 
-	private static class StoppableSource extends RichSourceFunction<Object> implements StoppableFunction {
-		private static final long serialVersionUID = 728864804042338806L;
-
-		@Override
-		public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Object> ctx)
-				throws Exception {
-		}
-
-		@Override
-		public void cancel() {}
-
-		@Override
-		public void stop() {
-			stopped = true;
-		}
-	}
-
-	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed {
+	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> {
 		private static final long serialVersionUID = 1;
 
 		private int maxElements;
@@ -240,9 +210,7 @@ public class SourceStreamTaskTest {
 		}
 
 		@Override
-		public void restoreState(Serializable state) {
-
-		}
+		public void restoreState(Serializable state) {}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 1a59ab3..d857672 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -203,7 +203,7 @@ public class TimestampITCase {
 					// try until we get the running jobs
 					List<JobID> running;
 					while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
-						Thread.sleep(100);
+						Thread.sleep(50);
 					}
 
 					JobID id = running.get(0);
@@ -223,22 +223,26 @@ public class TimestampITCase {
 		env.execute();
 
 		// verify that all the watermarks arrived at the final custom operator
-		for (int i = 0; i < PARALLELISM; i++) {
+		for (List<Watermark> subtaskWatermarks : CustomOperator.finalWatermarks) {
+			
 			// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the
 			// other source stops emitting after that
-			for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
-				if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
+			for (int j = 0; j < subtaskWatermarks.size(); j++) {
+				if (subtaskWatermarks.get(j).getTimestamp() != initialTime + j) {
 					System.err.println("All Watermarks: ");
 					for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
-						System.err.println(CustomOperator.finalWatermarks[i].get(k));
+						System.err.println(subtaskWatermarks.get(k));
 					}
 
 					fail("Wrong watermark.");
 				}
 			}
 			
-			assertNotEquals(Watermark.MAX_WATERMARK,
-					CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1));
+			// if there are watermarks, the final one must not be the MAX watermark
+			if (subtaskWatermarks.size() > 0) {
+				assertNotEquals(Watermark.MAX_WATERMARK,
+						subtaskWatermarks.get(subtaskWatermarks.size()-1));
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
index 333c01a..a073cba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
@@ -48,6 +48,7 @@ public class UserCodeType {
 		int port = Integer.parseInt(args[2]);
 
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+		env.getConfig().disableSysoutLogging();
 
 		DataSet<Integer> input = env.fromElements(1,2,3,4,5);