You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/14 12:10:10 UTC

[03/12] flink git commit: [tests] Add 'CheckedThread' as a common test utility

[tests] Add 'CheckedThread' as a common test utility


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/790153c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/790153c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/790153c0

Branch: refs/heads/master
Commit: 790153c065f79ae0e8bb045b96c85f8195bc7a29
Parents: 3feea13
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 13 19:15:11 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:32 2016 +0100

----------------------------------------------------------------------
 .../partition/InputGateConcurrentTest.java      | 35 +-------
 .../flink/core/testutils/CheckedThread.java     | 85 ++++++++++++++++++++
 2 files changed, 86 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/790153c0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 8cae04c..27177c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -223,40 +224,6 @@ public class InputGateConcurrentTest {
 	//  testing threads
 	// ------------------------------------------------------------------------
 
-	private static abstract class CheckedThread extends Thread {
-
-		private volatile Throwable error;
-
-		public abstract void go() throws Exception;
-
-		@Override
-		public void run() {
-			try {
-				go();
-			}
-			catch (Throwable t) {
-				error = t;
-			}
-		}
-
-		public void sync() throws Exception {
-			join();
-
-			// propagate the error
-			if (error != null) {
-				if (error instanceof Error) {
-					throw (Error) error;
-				}
-				else if (error instanceof Exception) {
-					throw (Exception) error;
-				}
-				else {
-					throw new Exception(error.getMessage(), error);
-				}
-			}
-		}
-	}
-
 	private static class ProducerThread extends CheckedThread {
 
 		private final Random rnd = new Random();

http://git-wip-us.apache.org/repos/asf/flink/blob/790153c0/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
new file mode 100644
index 0000000..1dad8c8
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+/**
+ * A thread that additionally catches exceptions and offers a joining method that
+ * re-throws the exceptions.
+ *
+ * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one
+ * needs to extends this class and implement the {@link #go()} method. That method may
+ * throw exceptions.
+ *
+ * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this
+ * thread via the {@link #sync()} method.
+ */
+public abstract class CheckedThread extends Thread {
+
+	/** The error thrown from the main work method */
+	private volatile Throwable error;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method needs to be overwritten to contain the main work logic.
+	 * It takes the role of {@link Thread#run()}, but should propagate exceptions.
+	 *
+	 * @throws Exception The exceptions thrown here will be re-thrown in the {@link #sync()} method.
+	 */
+	public abstract void go() throws Exception;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method is final - thread work should go into the {@link #go()} method instead.
+	 */
+	@Override
+	public final void run() {
+		try {
+			go();
+		}
+		catch (Throwable t) {
+			error = t;
+		}
+	}
+
+	/**
+	 * Waits until the thread is completed and checks whether any error occurred during
+	 * the execution.
+	 *
+	 * <p>This method blocks like {@link #join()}, but performs an additional check for
+	 * exceptions thrown from the {@link #go()} method.
+	 */
+	public void sync() throws Exception {
+		super.join();
+
+		// propagate the error
+		if (error != null) {
+			if (error instanceof Error) {
+				throw (Error) error;
+			}
+			else if (error instanceof Exception) {
+				throw (Exception) error;
+			}
+			else {
+				throw new Exception(error.getMessage(), error);
+			}
+		}
+	}
+}