You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/08/08 13:27:26 UTC
flink git commit: Add runAsync method to execute runnables in the
main rpc thread
Repository: flink
Updated Branches:
refs/heads/flip-6 583911497 -> fee1bef80
Add runAsync method to execute runnables in the main rpc thread
Add CompletedFuture
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fee1bef8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fee1bef8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fee1bef8
Branch: refs/heads/flip-6
Commit: fee1bef804e4e4716f201283beb79e77ef8d36e6
Parents: 5839114
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Aug 5 00:27:31 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Aug 8 15:19:20 2016 +0200
----------------------------------------------------------------------
flink-runtime/pom.xml | 5 +++
.../org/apache/flink/runtime/rpc/RpcServer.java | 35 ++++++++++++++---
.../apache/flink/runtime/rpc/RpcService.java | 4 +-
.../flink/runtime/rpc/RunnableRpcGateway.java | 23 +++++++++++
.../flink/runtime/rpc/akka/AkkaRpcService.java | 8 ++--
.../runtime/rpc/akka/RunnableAkkaActor.java | 33 ++++++++++++++++
.../runtime/rpc/akka/RunnableAkkaGateway.java | 30 ++++++++++++++
.../rpc/akka/jobmaster/JobMasterAkkaActor.java | 6 +--
.../akka/jobmaster/JobMasterAkkaGateway.java | 4 +-
.../rpc/akka/messages/RunnableMessage.java | 31 +++++++++++++++
.../ResourceManagerAkkaActor.java | 6 +--
.../ResourceManagerAkkaGateway.java | 4 +-
.../taskexecutor/TaskExecutorAkkaActor.java | 6 +--
.../taskexecutor/TaskExecutorAkkaGateway.java | 4 +-
.../flink/runtime/rpc/jobmaster/JobMaster.java | 34 ++++++----------
.../rpc/resourcemanager/ResourceManager.java | 19 +--------
.../runtime/rpc/taskexecutor/TaskExecutor.java | 19 +--------
.../flink/runtime/rpc/RpcCompletenessTest.java | 24 ++++++------
.../runtime/util/DirectExecutorService.java | 41 +++++++++++++++++++-
pom.xml | 14 +++----
20 files changed, 245 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 5fea8fb..09c6fd0 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -189,6 +189,11 @@ under the License.
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
index fba9250..c064c09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
@@ -18,12 +18,21 @@
package org.apache.flink.runtime.rpc;
+import org.apache.flink.runtime.rpc.akka.RunnableAkkaGateway;
+
/**
- * Marker interface for rpc servers. Every rpc server should implement this interface.
+ * Base class for rpc servers. Every rpc server should implement this interface.
*
- * @param <C> Rpc client counter part matching the RpcServer
+ * @param <C> Rpc gateway counter part matching the RpcServer
*/
-public interface RpcServer<C extends RpcGateway> {
+public abstract class RpcServer<C extends RpcGateway> {
+ private final RpcService rpcService;
+ private C self;
+
+ public RpcServer(RpcService rpcService) {
+ this.rpcService = rpcService;
+ }
+
/**
* Get self-gateway which should be used to run asynchronous rpc calls on the server.
*
@@ -32,9 +41,23 @@ public interface RpcServer<C extends RpcGateway> {
*
* @return Self gateway
*/
- C getSelf();
+ public C getSelf() {
+ return self;
+ }
+
+ public void runAsync(Runnable runnable) {
+ ((RunnableAkkaGateway) self).runAsync(runnable);
+ }
+
+ public RpcService getRpcService() {
+ return rpcService;
+ }
- void start();
+ public void start() {
+ self = rpcService.startServer(this);
+ }
- void shutDown();
+ public void shutDown() {
+ rpcService.stopServer(self);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index a12039c..fddcf9d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -23,9 +23,9 @@ import scala.concurrent.Future;
public interface RpcService {
<C extends RpcGateway> Future<C> connect(String address, Class<C> clazz);
- <S extends RpcServer, C extends RpcGateway> C startServer(S methodHandler, Class<C> rpcClientClass);
+ <S extends RpcServer, C extends RpcGateway> C startServer(S methodHandler);
- <T> void stopServer(T server);
+ <C extends RpcGateway> void stopServer(C gateway);
void stopService();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RunnableRpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RunnableRpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RunnableRpcGateway.java
new file mode 100644
index 0000000..c05c5fa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RunnableRpcGateway.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+public interface RunnableRpcGateway {
+ void runAsync(Runnable runnable);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index cb707bb..858e41a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -84,7 +84,7 @@ public class AkkaRpcService implements RpcService {
}
@Override
- public <S extends RpcServer, C extends RpcGateway> C startServer(S methodHandler, Class<C> rpcClientClass) {
+ public <S extends RpcServer, C extends RpcGateway> C startServer(S methodHandler) {
ActorRef ref;
C self;
if (methodHandler instanceof TaskExecutor) {
@@ -115,9 +115,9 @@ public class AkkaRpcService implements RpcService {
}
@Override
- public <T> void stopServer(T server) {
- if (server instanceof AkkaGateway) {
- AkkaGateway akkaClient = (AkkaGateway) server;
+ public <C extends RpcGateway> void stopServer(C selfGateway) {
+ if (selfGateway instanceof AkkaGateway) {
+ AkkaGateway akkaClient = (AkkaGateway) selfGateway;
if (actors.contains(akkaClient.getActorRef())) {
akkaClient.getActorRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaActor.java
new file mode 100644
index 0000000..745f3ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaActor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.UntypedActor;
+import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
+
+public class RunnableAkkaActor extends UntypedActor {
+ @Override
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof RunnableMessage) {
+ ((RunnableMessage) message).getRunnable().run();
+ } else {
+ throw new RuntimeException("Unknown message " + message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaGateway.java
new file mode 100644
index 0000000..b7c379d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaGateway.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorRef;
+import org.apache.flink.runtime.rpc.RunnableRpcGateway;
+import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
+
+public abstract class RunnableAkkaGateway implements RunnableRpcGateway, AkkaGateway {
+ @Override
+ public void runAsync(Runnable runnable) {
+ getActorRef().tell(new RunnableMessage(runnable), ActorRef.noSender());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
index a91d7d4..a1bff44 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.rpc.akka.jobmaster;
import akka.actor.ActorRef;
import akka.actor.Status;
-import akka.actor.UntypedActor;
+import org.apache.flink.runtime.rpc.akka.RunnableAkkaActor;
import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.akka.messages.HandleRegistrationResponse;
import org.apache.flink.runtime.rpc.akka.messages.TriggerResourceManagerRegistration;
import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
-public class JobMasterAkkaActor extends UntypedActor {
+public class JobMasterAkkaActor extends RunnableAkkaActor {
private final JobMaster jobMaster;
public JobMasterAkkaActor(JobMaster jobMaster) {
@@ -57,7 +57,7 @@ public class JobMasterAkkaActor extends UntypedActor {
jobMaster.handleRegistrationResponse(registrationResponse.getRegistrationResponse(), registrationResponse.getResourceManagerGateway());
} else {
- throw new RuntimeException("Unknown message type received: " + message.getClass());
+ super.onReceive(message);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
index fe4b0c1..15dad2b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.rpc.akka.jobmaster;
import akka.actor.ActorRef;
import akka.pattern.AskableActorRef;
import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.akka.RunnableAkkaGateway;
import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.akka.AkkaGateway;
import org.apache.flink.runtime.rpc.akka.messages.HandleRegistrationResponse;
import org.apache.flink.runtime.rpc.akka.messages.TriggerResourceManagerRegistration;
import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import scala.concurrent.Future;
import scala.reflect.ClassTag$;
-public class JobMasterAkkaGateway implements JobMasterGateway, AkkaGateway {
+public class JobMasterAkkaGateway extends RunnableAkkaGateway implements JobMasterGateway {
private final AskableActorRef actorRef;
private final Timeout timeout;
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
new file mode 100644
index 0000000..3556738
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+public class RunnableMessage {
+ private final Runnable runnable;
+
+ public RunnableMessage(Runnable runnable) {
+ this.runnable = runnable;
+ }
+
+ public Runnable getRunnable() {
+ return runnable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
index 7fc2ffa..9eef6ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.rpc.akka.resourcemanager;
import akka.actor.ActorRef;
import akka.actor.Status;
-import akka.actor.UntypedActor;
+import org.apache.flink.runtime.rpc.akka.RunnableAkkaActor;
import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
-public class ResourceManagerAkkaActor extends UntypedActor {
+public class ResourceManagerAkkaActor extends RunnableAkkaActor {
private final ResourceManager resourceManager;
public ResourceManagerAkkaActor(ResourceManager resourceManager) {
@@ -57,7 +57,7 @@ public class ResourceManagerAkkaActor extends UntypedActor {
sender.tell(new Status.Failure(e), getSelf());
}
} else {
- throw new RuntimeException("Encountered unknown message type: " + message.getClass());
+ super.onReceive(message);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
index c902bff..a02a070 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
@@ -21,18 +21,18 @@ package org.apache.flink.runtime.rpc.akka.resourcemanager;
import akka.actor.ActorRef;
import akka.pattern.AskableActorRef;
import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.akka.RunnableAkkaGateway;
import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.rpc.akka.AkkaGateway;
import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
import scala.concurrent.Future;
import scala.reflect.ClassTag$;
-public class ResourceManagerAkkaGateway implements ResourceManagerGateway, AkkaGateway {
+public class ResourceManagerAkkaGateway extends RunnableAkkaGateway implements ResourceManagerGateway {
private final AskableActorRef actorRef;
private final Timeout timeout;
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
index 8974d2b..89dad34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.rpc.akka.taskexecutor;
import akka.actor.ActorRef;
import akka.actor.Status;
-import akka.actor.UntypedActor;
import akka.dispatch.OnComplete;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.RunnableAkkaActor;
import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
-public class TaskExecutorAkkaActor extends UntypedActor {
+public class TaskExecutorAkkaActor extends RunnableAkkaActor {
private final TaskExecutorGateway taskExecutor;
public TaskExecutorAkkaActor(TaskExecutorGateway taskExecutor) {
@@ -71,7 +71,7 @@ public class TaskExecutorAkkaActor extends UntypedActor {
getContext().dispatcher()
);
} else {
- throw new RuntimeException("Encountered unknown message type: " + message.getClass());
+ super.onReceive(message);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
index ca064c1..0452804 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
@@ -24,14 +24,14 @@ import akka.util.Timeout;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.akka.AkkaGateway;
+import org.apache.flink.runtime.rpc.akka.RunnableAkkaGateway;
import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
import scala.concurrent.Future;
import scala.reflect.ClassTag$;
-public class TaskExecutorAkkaGateway implements TaskExecutorGateway, AkkaGateway {
+public class TaskExecutorAkkaGateway extends RunnableAkkaGateway implements TaskExecutorGateway {
private final AskableActorRef actorRef;
private final Timeout timeout;
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
index c2e2686..e40c148 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
@@ -37,16 +37,14 @@ import scala.concurrent.Future;
import java.util.concurrent.ExecutorService;
-public class JobMaster implements RpcServer<JobMasterGateway> {
+public class JobMaster extends RpcServer<JobMasterGateway> {
private final Logger LOG = LoggerFactory.getLogger(JobMaster.class);
- private final RpcService rpcService;
private final ExecutionContext executionContext;
- private JobMasterGateway self;
private ResourceManagerGateway resourceManager = null;
public JobMaster(RpcService rpcService, ExecutorService executorService) {
- this.rpcService = rpcService;
+ super(rpcService);
executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService);
}
@@ -54,17 +52,6 @@ public class JobMaster implements RpcServer<JobMasterGateway> {
return resourceManager;
}
- @Override
- public void start() {
- // start rpc server
- self = rpcService.startServer(this, JobMasterGateway.class);
- }
-
- @Override
- public void shutDown() {
- rpcService.stopServer(getSelf());
- }
-
@RpcMethod
public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionState) {
System.out.println("TaskExecutionState: " + taskExecutionState);
@@ -73,12 +60,19 @@ public class JobMaster implements RpcServer<JobMasterGateway> {
@RpcMethod
public void triggerResourceManagerRegistration(final String address) {
- Future<ResourceManagerGateway> resourceManagerFuture = rpcService.connect(address, ResourceManagerGateway.class);
+ Future<ResourceManagerGateway> resourceManagerFuture = getRpcService().connect(address, ResourceManagerGateway.class);
Future<RegistrationResponse> registrationResponseFuture = resourceManagerFuture.flatMap(new Mapper<ResourceManagerGateway, Future<RegistrationResponse>>() {
@Override
- public Future<RegistrationResponse> apply(ResourceManagerGateway resourceManagerGateway) {
- return resourceManagerGateway.registerJobMaster(new JobMasterRegistration());
+ public Future<RegistrationResponse> apply(final ResourceManagerGateway resourceManagerGateway) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ resourceManager = resourceManagerGateway;
+ }
+ });
+
+ return resourceManagerGateway.registerJobMaster(new JobMasterRegistration());
}
}, executionContext);
@@ -103,8 +97,4 @@ public class JobMaster implements RpcServer<JobMasterGateway> {
public boolean isConnected() {
return resourceManager != null;
}
-
- public JobMasterGateway getSelf() {
- return self;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
index 00aad0d..bdcd8cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -26,25 +26,14 @@ import scala.concurrent.ExecutionContext$;
import java.util.concurrent.ExecutorService;
-public class ResourceManager implements RpcServer<ResourceManagerGateway> {
- private final RpcService rpcService;
+public class ResourceManager extends RpcServer<ResourceManagerGateway> {
private final ExecutionContext executionContext;
- private ResourceManagerGateway self;
-
public ResourceManager(RpcService rpcService, ExecutorService executorService) {
- this.rpcService = rpcService;
+ super(rpcService);
this.executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService);
}
- public void start() {
- self = rpcService.startServer(this, ResourceManagerGateway.class);
- }
-
- public void shutDown() {
- rpcService.stopServer(getSelf());
- }
-
@RpcMethod
public RegistrationResponse registerJobMaster(JobMasterRegistration jobMasterRegistration) {
System.out.println("JobMasterRegistration: " + jobMasterRegistration);
@@ -56,8 +45,4 @@ public class ResourceManager implements RpcServer<ResourceManagerGateway> {
System.out.println("SlotRequest: " + slotRequest);
return new SlotAssignment();
}
-
- public ResourceManagerGateway getSelf() {
- return self;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
index 5c5b8f4..ef0b180 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
@@ -31,26 +31,15 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
-public class TaskExecutor implements RpcServer<TaskExecutorGateway> {
- private final RpcService rpcService;
+public class TaskExecutor extends RpcServer<TaskExecutorGateway> {
private final ExecutionContext executionContext;
private final Set<ExecutionAttemptID> tasks = new HashSet<>();
- private TaskExecutorGateway self;
-
public TaskExecutor(RpcService rpcService, ExecutorService executorService) {
- this.rpcService = rpcService;
+ super(rpcService);
this.executionContext = ExecutionContexts$.MODULE$.fromExecutor(executorService);
}
- public void start() {
- self = rpcService.startServer(this, TaskExecutorGateway.class);
- }
-
- public void shutDown() {
- rpcService.stopServer(getSelf());
- }
-
@RpcMethod
public Acknowledge executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
tasks.add(taskDeploymentDescriptor.getExecutionId());
@@ -65,8 +54,4 @@ public class TaskExecutor implements RpcServer<TaskExecutorGateway> {
throw new Exception("Could not find task.");
}
}
-
- public TaskExecutorGateway getSelf() {
- return self;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index e05bbac..27c8171 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -48,27 +48,25 @@ public class RpcCompletenessTest extends TestLogger {
for (Class<? extends RpcServer> rpcServer :classes){
c = rpcServer;
- Type[] interfaces = c.getGenericInterfaces();
+ Type superClass = c.getGenericSuperclass();
boolean foundRpcServerInterface = false;
- for (Type in: interfaces) {
- if (in instanceof ParameterizedType) {
- ParameterizedType parameterizedType = (ParameterizedType) in;
+ if (superClass instanceof ParameterizedType) {
+ ParameterizedType parameterizedType = (ParameterizedType) superClass;
- if (parameterizedType.getRawType() == RpcServer.class) {
- foundRpcServerInterface = true;
- Type[] typeArguments = parameterizedType.getActualTypeArguments();
+ if (parameterizedType.getRawType() == RpcServer.class) {
+ foundRpcServerInterface = true;
+ Type[] typeArguments = parameterizedType.getActualTypeArguments();
- assertEquals(1, typeArguments.length);
- assertTrue(typeArguments[0] instanceof Class<?>);
+ assertEquals(1, typeArguments.length);
+ assertTrue(typeArguments[0] instanceof Class<?>);
- Type rpcGatewayType = typeArguments[0];
+ Type rpcGatewayType = typeArguments[0];
- assertTrue(rpcGatewayType instanceof Class);
+ assertTrue(rpcGatewayType instanceof Class);
- checkCompleteness(rpcServer, (Class<?>) rpcGatewayType);
- }
+ checkCompleteness(rpcServer, (Class<?>) rpcGatewayType);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
index 0a6630d..1d7c971 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.util;
-import com.sun.xml.internal.ws.util.CompletedFuture;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -194,4 +192,43 @@ public class DirectExecutorService implements ExecutorService {
public void execute(Runnable command) {
command.run();
}
+
+ public static class CompletedFuture<V> implements Future<V> {
+ private final V value;
+ private final Exception exception;
+
+ public CompletedFuture(V value, Exception exception) {
+ this.value = value;
+ this.exception = exception;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ if (exception != null) {
+ throw new ExecutionException(exception);
+ } else {
+ return value;
+ }
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return get();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 479bb60..ada79eb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -187,13 +187,6 @@ under the License.
<type>jar</type>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.reflections</groupId>
- <artifactId>reflections</artifactId>
- <version>0.9.10</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<!-- this section defines the module versions that are used if nothing else is specified. -->
@@ -407,6 +400,13 @@ under the License.
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ <version>0.9.10</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</dependencyManagement>