You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/05/02 13:18:42 UTC
[2/8] flink git commit: [FLINK-9041][tests] Refactor StreamTaskTest
to use java 8 CompletableFuture instead of scala/akka Promise
[FLINK-9041][tests] Refactor StreamTaskTest to use java 8 CompletableFuture instead of scala/akka Promise
This closes #5912.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40e412ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40e412ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40e412ae
Branch: refs/heads/master
Commit: 40e412ae233046a5f6f38cf86288ab5185d9d194
Parents: 3242214
Author: Andrey Zagrebin <an...@linkresearchtools.org>
Authored: Wed Apr 25 11:02:09 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 2 15:18:06 2018 +0200
----------------------------------------------------------------------
.../streaming/runtime/tasks/StreamTaskTest.java | 41 +++++++-------------
pom.xml | 2 +-
2 files changed, 15 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/40e412ae/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index caea662..73a575e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
@@ -110,7 +111,6 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
-import akka.dispatch.Futures;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -125,6 +125,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -135,16 +136,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.impl.Promise;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -178,7 +174,7 @@ public class StreamTaskTest extends TestLogger {
*/
@Test
public void testEarlyCanceling() throws Exception {
- Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
+ Deadline deadline = Deadline.fromNow(Duration.ofMinutes(2));
StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setOperatorID(new OperatorID(4711L, 42L));
cfg.setStreamOperator(new SlowlyDeserializingOperator());
@@ -194,7 +190,7 @@ public class StreamTaskTest extends TestLogger {
Future<ExecutionState> running = testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.RUNNING);
// wait until the task thread reached state RUNNING
- ExecutionState executionState = Await.result(running, deadline.timeLeft());
+ ExecutionState executionState = running.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
// make sure the task is really running
if (executionState != ExecutionState.RUNNING) {
@@ -208,7 +204,7 @@ public class StreamTaskTest extends TestLogger {
Future<ExecutionState> canceling = testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.CANCELING);
- executionState = Await.result(canceling, deadline.timeLeft());
+ executionState = canceling.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
// the task should reach state canceled eventually
assertTrue(executionState == ExecutionState.CANCELING ||
@@ -858,25 +854,17 @@ public class StreamTaskTest extends TestLogger {
private ExecutionState executionState = null;
- private final PriorityQueue<Tuple2<ExecutionState, Promise<ExecutionState>>> priorityQueue = new PriorityQueue<>(
- 1,
- new Comparator<Tuple2<ExecutionState, Promise<ExecutionState>>>() {
- @Override
- public int compare(Tuple2<ExecutionState, Promise<ExecutionState>> o1, Tuple2<ExecutionState, Promise<ExecutionState>> o2) {
- return o1.f0.ordinal() - o2.f0.ordinal();
- }
- });
+ private final PriorityQueue<Tuple2<ExecutionState, CompletableFuture<ExecutionState>>> priorityQueue =
+ new PriorityQueue<>(1, Comparator.comparingInt(o -> o.f0.ordinal()));
- public Future<ExecutionState> notifyWhenExecutionState(ExecutionState executionState) {
+ Future<ExecutionState> notifyWhenExecutionState(ExecutionState executionState) {
synchronized (priorityQueue) {
if (this.executionState != null && this.executionState.ordinal() >= executionState.ordinal()) {
- return Futures.<ExecutionState>successful(executionState);
+ return CompletableFuture.completedFuture(executionState);
} else {
- Promise<ExecutionState> promise = new Promise.DefaultPromise<ExecutionState>();
-
+ CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
priorityQueue.offer(Tuple2.of(executionState, promise));
-
- return promise.future();
+ return promise;
}
}
}
@@ -887,9 +875,8 @@ public class StreamTaskTest extends TestLogger {
this.executionState = taskExecutionState.getExecutionState();
while (!priorityQueue.isEmpty() && priorityQueue.peek().f0.ordinal() <= executionState.ordinal()) {
- Promise<ExecutionState> promise = priorityQueue.poll().f1;
-
- promise.success(executionState);
+ CompletableFuture<ExecutionState> promise = priorityQueue.poll().f1;
+ promise.complete(executionState);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/40e412ae/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index baf4fda..3191da3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -452,7 +452,7 @@ under the License.
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
- <version>3.18.2-GA</version>
+ <version>3.19.0-GA</version>
</dependency>
<!-- joda time is pulled in different versions by different transitive dependencies-->