You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/04/13 20:51:28 UTC
[5/5] flink git commit: [FLINK-3745] [runtime] Fix early stopping of
stream sources
[FLINK-3745] [runtime] Fix early stopping of stream sources
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8570b6dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8570b6dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8570b6dc
Branch: refs/heads/master
Commit: 8570b6dc3ae3c69dc50e81a46835d40df8a03992
Parents: 2728f92
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Apr 13 12:26:42 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 20:50:49 2016 +0200
----------------------------------------------------------------------
.../tasks/StoppableSourceStreamTask.java | 15 +++-
.../tasks/SourceStreamTaskStoppingTest.java | 94 ++++++++++++++++++++
.../runtime/tasks/SourceStreamTaskTest.java | 40 +--------
.../streaming/timestamp/TimestampITCase.java | 18 ++--
.../test/classloading/jar/UserCodeType.java | 1 +
5 files changed, 123 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
index 5173796..7ff39b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
@@ -31,9 +31,20 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource;
public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & StoppableFunction>
extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> implements StoppableTask {
+ private volatile boolean stopped;
+
@Override
- public void stop() {
- this.headOperator.stop();
+ protected void run() throws Exception {
+ if (!stopped) {
+ super.run();
+ }
}
+ @Override
+ public void stop() {
+ stopped = true;
+ if (this.headOperator != null) {
+ this.headOperator.stop();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
new file mode 100644
index 0000000..ab9e59b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StoppableStreamSource;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * These tests verify that the RichFunction methods are called (in correct order). And that
+ * checkpointing/element emission don't occur concurrently.
+ */
+public class SourceStreamTaskStoppingTest {
+
+
+ // test flag for testStop()
+ static boolean stopped = false;
+
+ @Test
+ public void testStop() {
+ final StoppableSourceStreamTask<Object, StoppableSource> sourceTask = new StoppableSourceStreamTask<>();
+ sourceTask.headOperator = new StoppableStreamSource<>(new StoppableSource());
+
+ sourceTask.stop();
+
+ assertTrue(stopped);
+ }
+
+ @Test
+ public void testStopBeforeInitialization() throws Exception {
+
+ final StoppableSourceStreamTask<Object, StoppableFailingSource> sourceTask = new StoppableSourceStreamTask<>();
+ sourceTask.stop();
+
+ sourceTask.headOperator = new StoppableStreamSource<>(new StoppableFailingSource());
+ sourceTask.run();
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class StoppableSource extends RichSourceFunction<Object> implements StoppableFunction {
+ private static final long serialVersionUID = 728864804042338806L;
+
+ @Override
+ public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Object> ctx)
+ throws Exception {
+ }
+
+ @Override
+ public void cancel() {}
+
+ @Override
+ public void stop() {
+ stopped = true;
+ }
+ }
+
+ private static class StoppableFailingSource extends RichSourceFunction<Object> implements StoppableFunction {
+ private static final long serialVersionUID = 728864804042338806L;
+
+ @Override
+ public void run(SourceContext<Object> ctx) throws Exception {
+ fail("should not be called");
+ }
+
+ @Override
+ public void cancel() {}
+
+ @Override
+ public void stop() {}
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index bfb2d34..cb779b0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.runtime.tasks;
-import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -28,12 +27,13 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StoppableStreamSource;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.util.TestHarnessUtil;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
+
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -77,19 +77,6 @@ public class SourceStreamTaskTest {
Assert.assertEquals(10, resultElements.size());
}
- // test flag for testStop()
- static boolean stopped = false;
-
- @Test
- public void testStop() {
- final StoppableSourceStreamTask<Object, StoppableSource> sourceTask = new StoppableSourceStreamTask<>();
- sourceTask.headOperator = new StoppableStreamSource<>(new StoppableSource());
-
- sourceTask.stop();
-
- Assert.assertTrue(stopped);
- }
-
/**
* This test ensures that the SourceStreamTask properly serializes checkpointing
* and element emission. This also verifies that there are no concurrent invocations
@@ -155,24 +142,7 @@ public class SourceStreamTaskTest {
}
}
- private static class StoppableSource extends RichSourceFunction<Object> implements StoppableFunction {
- private static final long serialVersionUID = 728864804042338806L;
-
- @Override
- public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Object> ctx)
- throws Exception {
- }
-
- @Override
- public void cancel() {}
-
- @Override
- public void stop() {
- stopped = true;
- }
- }
-
- private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed {
+ private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> {
private static final long serialVersionUID = 1;
private int maxElements;
@@ -240,9 +210,7 @@ public class SourceStreamTaskTest {
}
@Override
- public void restoreState(Serializable state) {
-
- }
+ public void restoreState(Serializable state) {}
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 1a59ab3..d857672 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -203,7 +203,7 @@ public class TimestampITCase {
// try until we get the running jobs
List<JobID> running;
while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
- Thread.sleep(100);
+ Thread.sleep(50);
}
JobID id = running.get(0);
@@ -223,22 +223,26 @@ public class TimestampITCase {
env.execute();
// verify that all the watermarks arrived at the final custom operator
- for (int i = 0; i < PARALLELISM; i++) {
+ for (List<Watermark> subtaskWatermarks : CustomOperator.finalWatermarks) {
+
// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the
// other source stops emitting after that
- for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
- if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
+ for (int j = 0; j < subtaskWatermarks.size(); j++) {
+ if (subtaskWatermarks.get(j).getTimestamp() != initialTime + j) {
System.err.println("All Watermarks: ");
for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
- System.err.println(CustomOperator.finalWatermarks[i].get(k));
+ System.err.println(subtaskWatermarks.get(k));
}
fail("Wrong watermark.");
}
}
- assertNotEquals(Watermark.MAX_WATERMARK,
- CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1));
+ // if there are watermarks, the final one must not be the MAX watermark
+ if (subtaskWatermarks.size() > 0) {
+ assertNotEquals(Watermark.MAX_WATERMARK,
+ subtaskWatermarks.get(subtaskWatermarks.size()-1));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
index 333c01a..a073cba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
@@ -48,6 +48,7 @@ public class UserCodeType {
int port = Integer.parseInt(args[2]);
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+ env.getConfig().disableSysoutLogging();
DataSet<Integer> input = env.fromElements(1,2,3,4,5);