You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/05/02 13:18:42 UTC

[2/8] flink git commit: [FLINK-9041][tests] Refactor StreamTaskTest to use java 8 CompletableFuture instead of scala/akka Promise

[FLINK-9041][tests] Refactor StreamTaskTest to use java 8 CompletableFuture instead of scala/akka Promise

This closes #5912.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40e412ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40e412ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40e412ae

Branch: refs/heads/master
Commit: 40e412ae233046a5f6f38cf86288ab5185d9d194
Parents: 3242214
Author: Andrey Zagrebin <an...@linkresearchtools.org>
Authored: Wed Apr 25 11:02:09 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 2 15:18:06 2018 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTaskTest.java | 41 +++++++-------------
 pom.xml                                         |  2 +-
 2 files changed, 15 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40e412ae/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index caea662..73a575e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -110,7 +111,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
-import akka.dispatch.Futures;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -125,6 +125,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -135,16 +136,11 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.impl.Promise;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -178,7 +174,7 @@ public class StreamTaskTest extends TestLogger {
 	 */
 	@Test
 	public void testEarlyCanceling() throws Exception {
-		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
+		Deadline deadline = Deadline.fromNow(Duration.ofMinutes(2));
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		cfg.setOperatorID(new OperatorID(4711L, 42L));
 		cfg.setStreamOperator(new SlowlyDeserializingOperator());
@@ -194,7 +190,7 @@ public class StreamTaskTest extends TestLogger {
 		Future<ExecutionState> running = testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.RUNNING);
 
 		// wait until the task thread reached state RUNNING
-		ExecutionState executionState = Await.result(running, deadline.timeLeft());
+		ExecutionState executionState = running.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
 		// make sure the task is really running
 		if (executionState != ExecutionState.RUNNING) {
@@ -208,7 +204,7 @@ public class StreamTaskTest extends TestLogger {
 
 		Future<ExecutionState> canceling = testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.CANCELING);
 
-		executionState = Await.result(canceling, deadline.timeLeft());
+		executionState = canceling.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
 		// the task should reach state canceled eventually
 		assertTrue(executionState == ExecutionState.CANCELING ||
@@ -858,25 +854,17 @@ public class StreamTaskTest extends TestLogger {
 
 		private ExecutionState executionState = null;
 
-		private final PriorityQueue<Tuple2<ExecutionState, Promise<ExecutionState>>> priorityQueue = new PriorityQueue<>(
-			1,
-			new Comparator<Tuple2<ExecutionState, Promise<ExecutionState>>>() {
-				@Override
-				public int compare(Tuple2<ExecutionState, Promise<ExecutionState>> o1, Tuple2<ExecutionState, Promise<ExecutionState>> o2) {
-					return o1.f0.ordinal() - o2.f0.ordinal();
-				}
-			});
+		private final PriorityQueue<Tuple2<ExecutionState, CompletableFuture<ExecutionState>>> priorityQueue =
+			new PriorityQueue<>(1, Comparator.comparingInt(o -> o.f0.ordinal()));
 
-		public Future<ExecutionState> notifyWhenExecutionState(ExecutionState executionState) {
+		Future<ExecutionState> notifyWhenExecutionState(ExecutionState executionState) {
 			synchronized (priorityQueue) {
 				if (this.executionState != null && this.executionState.ordinal() >= executionState.ordinal()) {
-					return Futures.<ExecutionState>successful(executionState);
+					return CompletableFuture.completedFuture(executionState);
 				} else {
-					Promise<ExecutionState> promise = new Promise.DefaultPromise<ExecutionState>();
-
+					CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 					priorityQueue.offer(Tuple2.of(executionState, promise));
-
-					return promise.future();
+					return promise;
 				}
 			}
 		}
@@ -887,9 +875,8 @@ public class StreamTaskTest extends TestLogger {
 				this.executionState = taskExecutionState.getExecutionState();
 
 				while (!priorityQueue.isEmpty() && priorityQueue.peek().f0.ordinal() <= executionState.ordinal()) {
-					Promise<ExecutionState> promise = priorityQueue.poll().f1;
-
-					promise.success(executionState);
+					CompletableFuture<ExecutionState> promise = priorityQueue.poll().f1;
+					promise.complete(executionState);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/40e412ae/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index baf4fda..3191da3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -452,7 +452,7 @@ under the License.
 			<dependency>
 				<groupId>org.javassist</groupId>
 				<artifactId>javassist</artifactId>
-				<version>3.18.2-GA</version>
+				<version>3.19.0-GA</version>
 			</dependency>
 
 			<!-- joda time is pulled in different versions by different transitive dependencies-->