You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/14 12:10:10 UTC
[03/12] flink git commit: [tests] Add 'CheckedThread' as a common
test utility
[tests] Add 'CheckedThread' as a common test utility
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/790153c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/790153c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/790153c0
Branch: refs/heads/master
Commit: 790153c065f79ae0e8bb045b96c85f8195bc7a29
Parents: 3feea13
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 13 19:15:11 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:32 2016 +0100
----------------------------------------------------------------------
.../partition/InputGateConcurrentTest.java | 35 +-------
.../flink/core/testutils/CheckedThread.java | 85 ++++++++++++++++++++
2 files changed, 86 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/790153c0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 8cae04c..27177c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -223,40 +224,6 @@ public class InputGateConcurrentTest {
// testing threads
// ------------------------------------------------------------------------
- private static abstract class CheckedThread extends Thread {
-
- private volatile Throwable error;
-
- public abstract void go() throws Exception;
-
- @Override
- public void run() {
- try {
- go();
- }
- catch (Throwable t) {
- error = t;
- }
- }
-
- public void sync() throws Exception {
- join();
-
- // propagate the error
- if (error != null) {
- if (error instanceof Error) {
- throw (Error) error;
- }
- else if (error instanceof Exception) {
- throw (Exception) error;
- }
- else {
- throw new Exception(error.getMessage(), error);
- }
- }
- }
- }
-
private static class ProducerThread extends CheckedThread {
private final Random rnd = new Random();
http://git-wip-us.apache.org/repos/asf/flink/blob/790153c0/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
new file mode 100644
index 0000000..1dad8c8
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+/**
+ * A thread that additionally catches exceptions and offers a joining method that
+ * re-throws the exceptions.
+ *
+ * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one
+ * needs to extends this class and implement the {@link #go()} method. That method may
+ * throw exceptions.
+ *
+ * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this
+ * thread via the {@link #sync()} method.
+ */
+public abstract class CheckedThread extends Thread {
+
+ /** The error thrown from the main work method */
+ private volatile Throwable error;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * This method needs to be overwritten to contain the main work logic.
+ * It takes the role of {@link Thread#run()}, but should propagate exceptions.
+ *
+ * @throws Exception The exceptions thrown here will be re-thrown in the {@link #sync()} method.
+ */
+ public abstract void go() throws Exception;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * This method is final - thread work should go into the {@link #go()} method instead.
+ */
+ @Override
+ public final void run() {
+ try {
+ go();
+ }
+ catch (Throwable t) {
+ error = t;
+ }
+ }
+
+ /**
+ * Waits until the thread is completed and checks whether any error occurred during
+ * the execution.
+ *
+ * <p>This method blocks like {@link #join()}, but performs an additional check for
+ * exceptions thrown from the {@link #go()} method.
+ */
+ public void sync() throws Exception {
+ super.join();
+
+ // propagate the error
+ if (error != null) {
+ if (error instanceof Error) {
+ throw (Error) error;
+ }
+ else if (error instanceof Exception) {
+ throw (Exception) error;
+ }
+ else {
+ throw new Exception(error.getMessage(), error);
+ }
+ }
+ }
+}