You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2018/04/16 23:28:36 UTC
[3/3] reef git commit: [REEF-2002] Create Java project for gRPC two
process bridge
[REEF-2002] Create Java project for gRPC two process bridge
This addressed the issue by
* Adding gRPC bridge implementation
* Implemeting event handlers and objects that pass between application
and core driver.
JIRA:
[REEF-2002](https://issues.apache.org/jira/browse/REEF-2002)
Pull request:
This closes #1447
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/ea249f7f
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/ea249f7f
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/ea249f7f
Branch: refs/heads/REEF-335
Commit: ea249f7f4572748556a285e6828405f9a0f7107f
Parents: fed6fb7
Author: Tyson Condie <tc...@apache.org>
Authored: Tue Apr 3 15:11:18 2018 -0700
Committer: Doug Service <do...@apache.org>
Committed: Mon Apr 16 23:24:39 2018 +0000
----------------------------------------------------------------------
lang/common/proto/bridge/ClientProtocol.proto | 108 +++
.../proto/bridge/DriverClientProtocol.proto | 168 +++++
.../proto/bridge/DriverCommonProtocol.proto | 44 ++
.../proto/bridge/DriverServiceProtocol.proto | 155 ++++
lang/java/reef-bridge-proto-java/pom.xml | 255 +++++++
.../client/DefaultDriverClientStopHandler.java | 43 ++
.../reef/bridge/client/DriverClientClock.java | 127 ++++
.../client/DriverClientConfiguration.java | 202 ++++++
.../bridge/client/DriverClientDispatcher.java | 231 ++++++
.../client/DriverClientEvaluatorRequestor.java | 59 ++
.../client/DriverClientExceptionHandler.java | 43 ++
.../bridge/client/IAlarmDispatchHandler.java | 30 +
.../bridge/client/IDriverClientService.java | 45 ++
.../bridge/client/IDriverServiceClient.java | 132 ++++
.../reef/bridge/client/JVMClientProcess.java | 121 ++++
.../bridge/client/JavaDriverClientLauncher.java | 217 ++++++
.../client/events/ActiveContextBridge.java | 102 +++
.../client/events/AllocatedEvaluatorBridge.java | 166 +++++
.../client/events/ClosedContextBridge.java | 77 ++
.../client/events/CompletedEvaluatorBridge.java | 39 +
.../client/events/CompletedTaskBridge.java | 61 ++
.../client/events/ContextMessageBridge.java | 69 ++
.../client/events/FailedContextBridge.java | 110 +++
.../client/events/FailedEvaluatorBridge.java | 75 ++
.../bridge/client/events/RunningTaskBridge.java | 90 +++
.../bridge/client/events/TaskMessageBridge.java | 78 ++
.../reef/bridge/client/events/package-info.java | 22 +
.../grpc/DriverClientGrpcConfiguration.java | 42 ++
.../bridge/client/grpc/DriverClientService.java | 458 ++++++++++++
.../bridge/client/grpc/DriverServiceClient.java | 225 ++++++
.../reef/bridge/client/grpc/package-info.java | 22 +
.../grpc/parameters/DriverServicePort.java | 29 +
.../client/grpc/parameters/package-info.java | 22 +
.../apache/reef/bridge/client/package-info.java | 22 +
.../parameters/ClientDriverStopHandler.java | 36 +
.../DriverClientDispatchThreadCount.java | 30 +
.../bridge/client/parameters/package-info.java | 22 +
.../examples/WindowsRuntimePathProvider.java | 43 ++
.../reef/bridge/examples/hello/HelloDriver.java | 83 +++
.../reef/bridge/examples/hello/HelloREEF.java | 75 ++
.../reef/bridge/examples/hello/HelloTask.java | 39 +
.../bridge/examples/hello/package-info.java | 22 +
.../reef/bridge/examples/package-info.java | 22 +
.../bridge/service/DriverClientException.java | 30 +
.../service/DriverServiceConfiguration.java | 47 ++
.../bridge/service/DriverServiceHandlers.java | 236 +++++++
.../bridge/service/DriverServiceLauncher.java | 328 +++++++++
.../reef/bridge/service/IDriverService.java | 138 ++++
.../reef/bridge/service/RuntimeNames.java | 36 +
.../bridge/service/grpc/GRPCDriverService.java | 706 +++++++++++++++++++
.../reef/bridge/service/grpc/package-info.java | 22 +
.../reef/bridge/service/package-info.java | 22 +
.../service/parameters/DriverClientCommand.java | 31 +
.../bridge/service/parameters/package-info.java | 22 +
.../evaluator/EvaluatorDescriptorImpl.java | 4 +-
.../common/launch/JavaLaunchCommandBuilder.java | 2 +-
.../ports/parameters/TcpPortRangeBegin.java | 3 +-
.../ports/parameters/TcpPortRangeCount.java | 3 +-
.../ports/parameters/TcpPortRangeTryCount.java | 3 +-
pom.xml | 4 +
60 files changed, 5692 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/common/proto/bridge/ClientProtocol.proto
----------------------------------------------------------------------
diff --git a/lang/common/proto/bridge/ClientProtocol.proto b/lang/common/proto/bridge/ClientProtocol.proto
new file mode 100644
index 0000000..68bdcaa
--- /dev/null
+++ b/lang/common/proto/bridge/ClientProtocol.proto
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+syntax = "proto3";
+
+// option java_generic_services = true;
+// option java_multiple_files = true;
+option java_package = "org.apache.reef.bridge.proto";
+option java_outer_classname = "ClientProtocol";
+option csharp_namespace = "Org.Apache.REEF.Bridge.Proto";
+
+package driverbridge;
+
+message LocalRuntimeParameters {
+ uint32 max_number_of_evaluators = 1;
+ string runtime_root_folder = 2;
+ string jvm_heap_slack = 3;
+ repeated string rack_names = 4;
+}
+
+message YarnRuntimeParameters {
+ string queue = 1;
+ string job_submission_directory_prefix = 2;
+}
+
+message AzureBatchRuntimeParameters {
+
+}
+
+message MesosRuntimeParameters {
+
+}
+
+message DriverClientConfiguration {
+ string jobid = 1;
+
+ // driver machine resources
+ uint32 cpu_cores = 2;
+ uint32 memory_mb = 3;
+
+ // the runtime on which to launch
+ oneof runtime {
+ LocalRuntimeParameters local_runtime = 4;
+ YarnRuntimeParameters yarn_runtime = 5;
+ AzureBatchRuntimeParameters azbatch_runtime = 6;
+ MesosRuntimeParameters mesos_runtime = 7;
+ }
+
+ // The command to launch the driver client
+ string driver_client_launch_command = 10;
+
+ enum Handlers {
+ // control events
+ START = 0;
+ STOP = 1;
+
+ // evaluator events
+ EVALUATOR_ALLOCATED = 5;
+ EVALUATOR_COMPLETED = 6;
+ EVALUATOR_FAILED = 7;
+
+ // context events
+ CONTEXT_ACTIVE = 10;
+ CONTEXT_CLOSED = 11;
+ CONTEXT_FAILED = 12;
+ CONTEXT_MESSAGE = 13;
+
+ // task events
+ TASK_RUNNING = 15;
+ TASK_FAILED = 16;
+ TASK_COMPLETED = 17;
+ TASK_MESSAGE = 18;
+
+ // client events
+ CLIENT_MESSAGE = 20;
+ CLIENT_CLOSE = 21;
+ CLIENT_CLOSE_WITH_MESSAGE = 22;
+ }
+ repeated Handlers handler = 11;
+
+ // TCP port range
+ uint32 tcp_port_range_begin = 15;
+ uint32 tcp_port_range_count = 16;
+ uint32 tcp_port_range_try_count = 17;
+
+ // file dependencies
+ repeated string global_files = 20;
+ repeated string local_files = 21;
+ repeated string global_libraries = 22;
+ repeated string local_libraries = 23;
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/common/proto/bridge/DriverClientProtocol.proto
----------------------------------------------------------------------
diff --git a/lang/common/proto/bridge/DriverClientProtocol.proto b/lang/common/proto/bridge/DriverClientProtocol.proto
new file mode 100644
index 0000000..f80cff2
--- /dev/null
+++ b/lang/common/proto/bridge/DriverClientProtocol.proto
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+syntax = "proto3";
+
+// option java_generic_services = true;
+option java_multiple_files = true;
+option java_package = "org.apache.reef.bridge.proto";
+option java_outer_classname = "DriverClientProtocol";
+option csharp_namespace = "Org.Apache.REEF.Bridge.Proto";
+
+package driverbridge;
+
+import "DriverCommonProtocol.proto";
+
+// The java driver service definition.
+service DriverClient {
+ // Inquire if idle
+ rpc IdlenessCheckHandler (Void) returns (IdleStatus) {}
+
+ // Request for resources
+ rpc StartHandler (StartTimeInfo) returns (Void) {}
+
+ rpc StopHandler (StopTimeInfo) returns (Void) {}
+
+ rpc AlarmTrigger (AlarmTriggerInfo) returns (Void) {}
+
+ // Evaluator handlers
+ rpc AllocatedEvaluatorHandler (EvaluatorInfo) returns (Void) {}
+
+ rpc CompletedEvaluatorHandler (EvaluatorInfo) returns (Void) {}
+
+ rpc FailedEvaluatorHandler (EvaluatorInfo) returns (Void) {}
+
+ // Context handlers
+ rpc ActiveContextHandler (ContextInfo) returns (Void) {}
+
+ rpc ClosedContextHandler (ContextInfo) returns (Void) {}
+
+ rpc FailedContextHandler (ContextInfo) returns (Void) {}
+
+ rpc ContextMessageHandler (ContextMessageInfo) returns (Void) {}
+
+ // Task handlers
+ rpc RunningTaskHandler (TaskInfo) returns (Void) {}
+
+ rpc FailedTaskHandler (TaskInfo) returns (Void) {}
+
+ rpc CompletedTaskHandler (TaskInfo) returns (Void) {}
+
+ rpc SuspendedTaskHandler (TaskInfo) returns (Void) {}
+
+ rpc TaskMessageHandler (TaskMessageInfo) returns (Void) {}
+
+ // Client Handlers
+ rpc ClientMessageHandler (ClientMessageInfo) returns (Void) {}
+
+ rpc ClientCloseHandler (Void) returns (Void) {}
+
+ rpc ClientCloseWithMessageHandler (ClientMessageInfo) returns (Void) {}
+}
+
+// IdleStatus response to idleness inquiry
+message IdleStatus {
+ bool is_idle = 1;
+ string reason = 2;
+}
+
+// The request message containing resource request.
+message StartTimeInfo {
+ int64 start_time = 1;
+}
+
+message StopTimeInfo {
+ int64 stop_time = 1;
+}
+
+// Information associated with an alarm that was set.
+message AlarmTriggerInfo {
+ string alarm_id = 1;
+}
+
+message EvaluatorDescriptorInfo {
+ // the amount of memory allocated
+ int32 memory = 1;
+
+ // the number of virtual cores allocated
+ int32 cores = 2;
+
+ // name of the runtime
+ string runtime_name = 3;
+}
+
+message EvaluatorInfo {
+ string evaluator_id = 1;
+
+ message FailureInfo {
+ string message = 1;
+ repeated string failedContexts = 2;
+ string failedTaskId = 3;
+ }
+ FailureInfo failure = 2;
+
+ EvaluatorDescriptorInfo descriptor_info = 3;
+}
+
+message ContextInfo {
+ string context_id = 1;
+
+ string evaluator_id = 2;
+
+ string parent_id = 3;
+
+ // Optional exception information
+ ExceptionInfo exception = 5;
+}
+
+message ContextMessageInfo {
+ string context_id = 1;
+
+ bytes payload = 2;
+
+ int64 sequence_number = 3;
+
+ string message_source_id = 4;
+}
+
+message TaskInfo {
+ string task_id = 1;
+
+ string context_id = 2;
+
+ bytes result = 3;
+
+ ExceptionInfo exception = 5;
+}
+
+message TaskMessageInfo {
+ string task_id = 1;
+
+ bytes payload = 2;
+
+ int64 sequence_number = 3;
+
+ string context_id = 4;
+
+ string message_source_id = 5;
+}
+
+message ClientMessageInfo {
+ bytes payload = 1;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/common/proto/bridge/DriverCommonProtocol.proto
----------------------------------------------------------------------
diff --git a/lang/common/proto/bridge/DriverCommonProtocol.proto b/lang/common/proto/bridge/DriverCommonProtocol.proto
new file mode 100644
index 0000000..7ec8905
--- /dev/null
+++ b/lang/common/proto/bridge/DriverCommonProtocol.proto
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+syntax = "proto3";
+
+// option java_generic_services = true;
+option java_multiple_files = true;
+option java_package = "org.apache.reef.bridge.proto";
+option csharp_namespace = "Org.Apache.REEF.Bridge.Proto";
+
+package driverbridge;
+
+// Void message type
+message Void {}
+
+message ExceptionInfo {
+ // Exception name/type
+ string name = 1;
+
+ // Exception message
+ string message = 2;
+
+ // Stack trace
+ repeated string stack_trace = 3;
+
+ // Data associated with exception
+ bytes data = 4;
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/common/proto/bridge/DriverServiceProtocol.proto
----------------------------------------------------------------------
diff --git a/lang/common/proto/bridge/DriverServiceProtocol.proto b/lang/common/proto/bridge/DriverServiceProtocol.proto
new file mode 100644
index 0000000..7f6da24
--- /dev/null
+++ b/lang/common/proto/bridge/DriverServiceProtocol.proto
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+syntax = "proto3";
+
+// option java_generic_services = true;
+option java_multiple_files = true;
+option java_package = "org.apache.reef.bridge.proto";
+option java_outer_classname = "DriverBridgeProtocol";
+option csharp_namespace = "Org.Apache.REEF.Bridge.Proto";
+
+package driverbridge;
+
+import "DriverCommonProtocol.proto";
+
+// The java driver service definition.
+service DriverService {
+ // Driver client registration
+ rpc RegisterDriverClient (DriverClientRegistration) returns (Void) {}
+
+ // Request for resources
+ rpc RequestResources (ResourceRequest) returns (Void) {}
+
+ // Request system shutdown
+ rpc Shutdown (ShutdownRequest) returns (Void) {}
+
+ // Request for setting an alarm
+ rpc SetAlarm (AlarmRequest) returns (Void) {}
+
+ // Request operation on an allocated evaluator
+ rpc AllocatedEvaluatorOp (AllocatedEvaluatorRequest) returns (Void) {}
+
+ // Request operation on an active context
+ rpc ActiveContextOp (ActiveContextRequest) returns (Void) {}
+
+ // Request operation on a running task
+ rpc RunningTaskOp (RunningTaskRequest) returns (Void) {}
+}
+
+message DriverClientRegistration {
+ // The client's host
+ string host = 1;
+
+ // The client's server port
+ int32 port = 2;
+}
+
+// The request message containing resource request.
+message ResourceRequest {
+ repeated string node_name_list = 1;
+
+ repeated string rack_name_list = 2;
+
+ int32 resource_count = 3;
+
+ int32 memory_size = 4;
+
+ int32 priority = 5;
+
+ int32 cores = 6;
+
+ bool relax_locality = 7;
+
+ string runtime_name = 8;
+}
+
+// Request for an alarm to be set
+message AlarmRequest {
+ // used to uniquely identify the alarm
+ string alarm_id = 1;
+
+ // timeout in milliseconds
+ int32 timeout_ms = 2;
+}
+
+message ShutdownRequest {
+ ExceptionInfo exception = 1;
+}
+
+message AllocatedEvaluatorRequest {
+ // The evaluator used to submit
+ string evaluator_id = 1;
+
+ bool close_evaluator = 2;
+
+ repeated string add_files = 3;
+
+ repeated string add_libraries = 4;
+
+ // Evaluator configuration
+ string evaluator_configuration = 5;
+
+ // Context configuration
+ string context_configuration = 6;
+
+ // Task configuration
+ string task_configuration = 7;
+
+ message EvaluatorProcessRequest {
+ int32 memory_mb = 1;
+
+ string configuration_file_name = 2;
+
+ string standard_out = 3;
+
+ string standard_err = 4;
+
+ repeated string options = 5;
+ }
+ EvaluatorProcessRequest set_process = 8;
+}
+
+message ActiveContextRequest {
+ string context_id = 1;
+
+ oneof operation {
+ // close the context
+ bool close_context = 2;
+
+ // send message to the context
+ bytes message = 3;
+
+ // create a child context
+ string new_context_request = 4;
+
+ // launch a task
+ string new_task_request = 5;
+ }
+}
+
+message RunningTaskRequest {
+ string task_id = 1;
+
+ // close the task
+ bool close_task = 2;
+
+ // send task a message
+ bytes message = 3;
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/pom.xml b/lang/java/reef-bridge-proto-java/pom.xml
new file mode 100644
index 0000000..f177b7c
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/pom.xml
@@ -0,0 +1,255 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>reef-bridge-proto-java</artifactId>
+ <name>REEF Bridge Protobuf Java</name>
+ <description>Protocol Buffer Bridge between JVM and CLR.</description>
+
+ <parent>
+ <groupId>org.apache.reef</groupId>
+ <artifactId>reef-project</artifactId>
+ <version>0.17.0-SNAPSHOT</version>
+ <relativePath>../../..</relativePath>
+ </parent>
+
+ <properties>
+ <rootPath>${basedir}/../../..</rootPath>
+ <protoPath>${rootPath}/lang/common/proto/bridge</protoPath>
+ <!-- protobuf paths -->
+ <protobuf.input.directory>${protoPath}</protobuf.input.directory>
+ <protobuf.output.directory>${project.build.directory}/generated-sources</protobuf.output.directory>
+
+ <!-- library versions -->
+ <maven.assembly>3.1.0</maven.assembly>
+ <grpc.version>1.10.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
+ <netty.version>4.1.17.Final</netty.version>
+ <build-helper-maven-plugin.version>1.9.1</build-helper-maven-plugin.version>
+ <maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
+ <maven-dependency-plugin.version>2.10</maven-dependency-plugin.version>
+ <maven-shade-plugin.version>2.4.2</maven-shade-plugin.version>
+ <os-maven-plugin.version>1.4.1.Final</os-maven-plugin.version>
+ <protobuf.version>3.5.1</protobuf.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-runtime-local</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-webserver</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.9</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-all</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <!-- netty version used by grpc -->
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.reef</groupId>
+ <artifactId>reef-runtime-yarn</artifactId>
+ <version>0.17.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <extensions>
+ <!-- provides os.detected.classifier (i.e. linux-x86_64, osx-x86_64) property -->
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>${os-maven-plugin.version}</version>
+ </extension>
+ </extensions>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <configLocation>lang/java/reef-common/src/main/resources/checkstyle-strict.xml</configLocation>
+ </configuration>
+ </plugin>
+ <!-- Unfortunately, we need to run this plugin twice (seperately) to generate gRPC service definitions
+ and protocol buffer message definitions. Combining them in one call has a bug in <clearOutputDirectory>
+ which we do only on the first run.
+ -->
+ <!-- Generate gRPC definitions -->
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.5.1</version>
+ <configuration>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+ <protoSourceRoot>${protobuf.input.directory}</protoSourceRoot>
+ <outputDirectory>${protobuf.output.directory}</outputDirectory>
+ <clearOutputDirectory>true</clearOutputDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- Generate protocol buffer message definitions -->
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.5.1</version>
+ <configuration>
+ <pluginId>grpc-java</pluginId>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+ <protoSourceRoot>${protobuf.input.directory}</protoSourceRoot>
+ <outputDirectory>${protobuf.output.directory}</outputDirectory>
+ <clearOutputDirectory>false</clearOutputDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- add generated grpc classes into the package -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>${build-helper-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>add-classes</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${protobuf.output.directory}</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- shade protobuf to avoid version conflicts -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${maven-shade-plugin.version}</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google.protobuf</pattern>
+ <shadedPattern>${project.groupId}.${project.artifactId}.shaded.protobuf</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${maven-shade-plugin.version}</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>${project.groupId}.${project.artifactId}.shaded.netty</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>${maven.assembly}</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id> <!-- this is used for inheritance merges -->
+ <phase>package</phase> <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java
new file mode 100644
index 0000000..8636f7a
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default java client driver stop handler.
+ */
+public final class DefaultDriverClientStopHandler implements EventHandler<StopTime> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultDriverClientStopHandler.class.getName());
+
+ @Inject
+ private DefaultDriverClientStopHandler() {}
+
+ @Override
+ public void onNext(final StopTime value) {
+ LOG.log(Level.FINEST, "Stop time {0}", value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java
new file mode 100644
index 0000000..162cbe5
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.Time;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.runtime.Timer;
+import org.apache.reef.wake.time.runtime.event.ClientAlarm;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The bridge driver client clock.
+ */
+public final class DriverClientClock implements Clock, IAlarmDispatchHandler {
+
+ private static final Logger LOG = Logger.getLogger(DriverClientClock.class.getName());
+
+ private final IDriverClientService driverClientService;
+
+ private final IDriverServiceClient driverServiceClient;
+
+ private final Timer timer;
+
+ private final Map<String, ClientAlarm> alarmMap = new HashMap<>();
+
+ private boolean closed = false;
+
+ @Inject
+ private DriverClientClock(
+ final Timer timer,
+ final IDriverClientService driverClientService,
+ final IDriverServiceClient driverServiceClient) {
+ this.timer = timer;
+ this.driverClientService = driverClientService;
+ this.driverServiceClient = driverServiceClient;
+ }
+
+ @Override
+ public Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler) {
+ final ClientAlarm alarm = new ClientAlarm(this.timer.getCurrent() + offset, handler);
+ final String alarmId = UUID.randomUUID().toString();
+ this.alarmMap.put(alarmId, alarm);
+ this.driverServiceClient.onSetAlarm(alarmId, offset);
+ return alarm;
+ }
+
+ @Override
+ public void close() {
+ stop();
+ }
+
+ @Override
+ public void stop() {
+ if (!closed) {
+ this.closed = true;
+ this.driverServiceClient.onShutdown();
+ }
+ }
+
+ @Override
+ public void stop(final Throwable exception) {
+ if (!closed) {
+ this.closed = true;
+ this.driverServiceClient.onShutdown(exception);
+ }
+ }
+
+ @Override
+ public boolean isIdle() {
+ return this.closed && this.alarmMap.isEmpty();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return this.closed;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.driverClientService.start();
+ this.driverClientService.awaitTermination();
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Alarm clock event handler.
+ * @param alarmId alarm identifier
+ */
+ @Override
+ public void onNext(final String alarmId) {
+ if (this.alarmMap.containsKey(alarmId)) {
+ final ClientAlarm clientAlarm = this.alarmMap.remove(alarmId);
+ clientAlarm.run();
+ } else {
+ LOG.log(Level.SEVERE, "Unknown alarm id {0}", alarmId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java
new file mode 100644
index 0000000..50da3ce
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.bridge.client.parameters.DriverClientDispatchThreadCount;
+import org.apache.reef.bridge.client.parameters.ClientDriverStopHandler;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalImpl;
+import org.apache.reef.tang.formats.RequiredImpl;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+/**
+ * Driver client configuration.
+ */
+public final class DriverClientConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * The event handler invoked right after the driver boots up.
+ */
+ public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>();
+
+ /**
+ * The event handler invoked right before the driver shuts down. Defaults to ignore.
+ */
+ public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>();
+
+ // ***** EVALUATOR HANDLER BINDINGS:
+
+ /**
+ * Event handler for allocated evaluators. Defaults to returning the evaluator if not bound.
+ */
+ public static final OptionalImpl<EventHandler<AllocatedEvaluator>> ON_EVALUATOR_ALLOCATED = new OptionalImpl<>();
+
+ /**
+ * Event handler for completed evaluators. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<CompletedEvaluator>> ON_EVALUATOR_COMPLETED = new OptionalImpl<>();
+
+ /**
+ * Event handler for failed evaluators. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_EVALUATOR_FAILED = new OptionalImpl<>();
+
+ // ***** TASK HANDLER BINDINGS:
+
+ /**
+ * Event handler for task messages. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<TaskMessage>> ON_TASK_MESSAGE = new OptionalImpl<>();
+
+ /**
+ * Event handler for completed tasks. Defaults to closing the context the task ran on if not bound.
+ */
+ public static final OptionalImpl<EventHandler<CompletedTask>> ON_TASK_COMPLETED = new OptionalImpl<>();
+
+ /**
+ * Event handler for failed tasks. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedTask>> ON_TASK_FAILED = new OptionalImpl<>();
+
+ /**
+ * Event handler for running tasks. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>();
+
+ /**
+ * Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support
+ * task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then.
+ */
+ public static final OptionalImpl<EventHandler<SuspendedTask>> ON_TASK_SUSPENDED = new OptionalImpl<>();
+
+ // ***** CLIENT HANDLER BINDINGS:
+
+ /**
+ * Event handler for client messages. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_MESSAGE = new OptionalImpl<>();
+
+ /**
+ * Event handler for close messages sent by the client. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<Void>> ON_CLIENT_CLOSED = new OptionalImpl<>();
+
+ /**
+ * Event handler for close messages sent by the client. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_CLOSED_MESSAGE = new OptionalImpl<>();
+
+ // ***** CONTEXT HANDLER BINDINGS:
+
+ /**
+ * Event handler for active context. Defaults to closing the context if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>();
+
+ /**
+ * Event handler for closed context. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>();
+
+ /**
+ * Event handler for closed context. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedContext>> ON_CONTEXT_FAILED = new OptionalImpl<>();
+
+ /**
+ * Event handler for context messages. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>();
+
+ /**
+ * Number of dispatch threads to use.
+ */
+ public static final OptionalImpl<Integer> CLIENT_DRIVER_DISPATCH_THREAD_COUNT = new OptionalImpl<>();
+
+ /**
+ * Alarm dispatch handler.
+ */
+ public static final OptionalImpl<IAlarmDispatchHandler> ALARM_DISPATCH_HANDLER = new OptionalImpl<>();
+
+ /**
+ * Default to gRPC Driver Client Service.
+ */
+ public static final OptionalImpl<IDriverClientService> DRIVER_CLIENT_SERVICE = new OptionalImpl<>();
+
+ /**
+ * Default to gRPC Driver Service Client.
+ */
+ public static final OptionalImpl<IDriverServiceClient> DRIVER_SERVICE_CLIENT = new OptionalImpl<>();
+
+ /**
+ * ConfigurationModule to fill out to get a legal Driver Configuration.
+ */
+ public static final ConfigurationModule CONF = new DriverClientConfiguration()
+ .bindImplementation(Clock.class, DriverClientClock.class)
+ .bindImplementation(EvaluatorRequestor.class, DriverClientEvaluatorRequestor.class)
+ .bindImplementation(IAlarmDispatchHandler.class, ALARM_DISPATCH_HANDLER)
+ .bindImplementation(IDriverClientService.class, DRIVER_CLIENT_SERVICE)
+ .bindImplementation(IDriverServiceClient.class, DRIVER_SERVICE_CLIENT)
+
+ .bindNamedParameter(DriverClientDispatchThreadCount.class, CLIENT_DRIVER_DISPATCH_THREAD_COUNT)
+
+ // Driver start/stop handlers
+ .bindSetEntry(DriverStartHandler.class, ON_DRIVER_STARTED)
+ .bindSetEntry(ClientDriverStopHandler.class, ON_DRIVER_STOP)
+
+ // Evaluator handlers
+ .bindSetEntry(EvaluatorAllocatedHandlers.class, ON_EVALUATOR_ALLOCATED)
+ .bindSetEntry(EvaluatorCompletedHandlers.class, ON_EVALUATOR_COMPLETED)
+ .bindSetEntry(EvaluatorFailedHandlers.class, ON_EVALUATOR_FAILED)
+
+ // Task handlers
+ .bindSetEntry(TaskRunningHandlers.class, ON_TASK_RUNNING)
+ .bindSetEntry(TaskFailedHandlers.class, ON_TASK_FAILED)
+ .bindSetEntry(TaskMessageHandlers.class, ON_TASK_MESSAGE)
+ .bindSetEntry(TaskCompletedHandlers.class, ON_TASK_COMPLETED)
+ .bindSetEntry(TaskSuspendedHandlers.class, ON_TASK_SUSPENDED)
+
+ // Context handlers
+ .bindSetEntry(ContextActiveHandlers.class, ON_CONTEXT_ACTIVE)
+ .bindSetEntry(ContextClosedHandlers.class, ON_CONTEXT_CLOSED)
+ .bindSetEntry(ContextMessageHandlers.class, ON_CONTEXT_MESSAGE)
+ .bindSetEntry(ContextFailedHandlers.class, ON_CONTEXT_FAILED)
+
+ // Client handlers
+ .bindSetEntry(ClientMessageHandlers.class, ON_CLIENT_MESSAGE)
+ .bindSetEntry(ClientCloseHandlers.class, ON_CLIENT_CLOSED)
+ .bindSetEntry(ClientCloseWithMessageHandlers.class, ON_CLIENT_CLOSED_MESSAGE)
+
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java
new file mode 100644
index 0000000..3dd9b88
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import com.google.common.collect.Sets;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.bridge.client.parameters.DriverClientDispatchThreadCount;
+import org.apache.reef.bridge.client.parameters.ClientDriverStopHandler;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.runtime.common.utils.DispatchingEStage;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.Set;
+
+/**
+ * Async dispatch of client driver events.
+ */
+@Private
+public final class DriverClientDispatcher {
+
+ /**
+ * Dispatcher used for application provided event handlers.
+ */
+ private final DispatchingEStage applicationDispatcher;
+
+ /**
+ * Dispatcher for client close events.
+ */
+ private final DispatchingEStage clientCloseDispatcher;
+
+ /**
+ * Dispatcher for client close with message events.
+ */
+ private final DispatchingEStage clientCloseWithMessageDispatcher;
+
+ /**
+ * Dispatcher for client messages.
+ */
+ private final DispatchingEStage clientMessageDispatcher;
+
+ /**
+ * The alarm dispatcher.
+ */
+ private final DispatchingEStage alarmDispatcher;
+
+ @Inject
+ private DriverClientDispatcher(
+ final DriverClientExceptionHandler driverExceptionHandler,
+ final IAlarmDispatchHandler alarmDispatchHandler,
+ @Parameter(DriverClientDispatchThreadCount.class)
+ final Integer numberOfThreads,
+ // Application-provided start and stop handlers
+ @Parameter(DriverStartHandler.class)
+ final Set<EventHandler<StartTime>> startHandlers,
+ @Parameter(ClientDriverStopHandler.class)
+ final Set<EventHandler<StopTime>> stopHandlers,
+ // Application-provided Context event handlers
+ @Parameter(ContextActiveHandlers.class)
+ final Set<EventHandler<ActiveContext>> contextActiveHandlers,
+ @Parameter(ContextClosedHandlers.class)
+ final Set<EventHandler<ClosedContext>> contextClosedHandlers,
+ @Parameter(ContextFailedHandlers.class)
+ final Set<EventHandler<FailedContext>> contextFailedHandlers,
+ @Parameter(ContextMessageHandlers.class)
+ final Set<EventHandler<ContextMessage>> contextMessageHandlers,
+ // Application-provided Task event handlers
+ @Parameter(TaskRunningHandlers.class)
+ final Set<EventHandler<RunningTask>> taskRunningHandlers,
+ @Parameter(TaskCompletedHandlers.class)
+ final Set<EventHandler<CompletedTask>> taskCompletedHandlers,
+ @Parameter(TaskSuspendedHandlers.class)
+ final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
+ @Parameter(TaskMessageHandlers.class)
+ final Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
+ @Parameter(TaskFailedHandlers.class)
+ final Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
+ // Application-provided Evaluator event handlers
+ @Parameter(EvaluatorAllocatedHandlers.class)
+ final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers,
+ @Parameter(EvaluatorFailedHandlers.class)
+ final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
+ @Parameter(EvaluatorCompletedHandlers.class)
+ final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
+ // Client handlers
+ @Parameter(ClientCloseHandlers.class)
+ final Set<EventHandler<Void>> clientCloseHandlers,
+ @Parameter(ClientCloseWithMessageHandlers.class)
+ final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers,
+ @Parameter(ClientMessageHandlers.class)
+ final Set<EventHandler<byte[]>> clientMessageHandlers) {
+
+ this.applicationDispatcher = new DispatchingEStage(
+ driverExceptionHandler, numberOfThreads, "ClientDriverDispatcher");
+ // Application start and stop handlers
+ this.applicationDispatcher.register(StartTime.class, startHandlers);
+ this.applicationDispatcher.register(StopTime.class, stopHandlers);
+ // Application Context event handlers
+ this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers);
+ this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers);
+ this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers);
+ this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers);
+
+ // Application Task event handlers.
+ this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers);
+ this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers);
+ this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers);
+ this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers);
+ this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers);
+
+ // Application Evaluator event handlers
+ this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers);
+ this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedHandlers);
+ this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedHandlers);
+
+ // Client event handlers;
+ this.clientCloseDispatcher = new DispatchingEStage(this.applicationDispatcher);
+ this.clientCloseDispatcher.register(Void.class, clientCloseHandlers);
+
+ this.clientCloseWithMessageDispatcher = new DispatchingEStage(this.applicationDispatcher);
+ this.clientCloseWithMessageDispatcher.register(byte[].class, clientCloseWithMessageHandlers);
+
+ this.clientMessageDispatcher = new DispatchingEStage(this.applicationDispatcher);
+ this.clientMessageDispatcher.register(byte[].class, clientMessageHandlers);
+
+ // Alarm event handlers
+ this.alarmDispatcher = new DispatchingEStage(this.applicationDispatcher);
+ this.alarmDispatcher.register(String.class,
+ Sets.newHashSet((EventHandler<String>)alarmDispatchHandler));
+ }
+
+ public void dispatch(final StartTime startTime) {
+ this.applicationDispatcher.onNext(StartTime.class, startTime);
+ }
+
+ public void dispatch(final StopTime stopTime) {
+ this.applicationDispatcher.onNext(StopTime.class, stopTime);
+ }
+
+ public void dispatch(final ActiveContext context) {
+ this.applicationDispatcher.onNext(ActiveContext.class, context);
+ }
+
+ public void dispatch(final ClosedContext context) {
+ this.applicationDispatcher.onNext(ClosedContext.class, context);
+ }
+
+ public void dispatch(final FailedContext context) {
+ this.applicationDispatcher.onNext(FailedContext.class, context);
+ }
+
+ public void dispatch(final ContextMessage message) {
+ this.applicationDispatcher.onNext(ContextMessage.class, message);
+ }
+
+ public void dispatch(final AllocatedEvaluator evaluator) {
+ this.applicationDispatcher.onNext(AllocatedEvaluator.class, evaluator);
+ }
+
+ public void dispatch(final FailedEvaluator evaluator) {
+ this.applicationDispatcher.onNext(FailedEvaluator.class, evaluator);
+ }
+
+ public void dispatch(final CompletedEvaluator evaluator) {
+ this.applicationDispatcher.onNext(CompletedEvaluator.class, evaluator);
+ }
+
+ public void dispatch(final RunningTask task) {
+ this.applicationDispatcher.onNext(RunningTask.class, task);
+ }
+
+ public void dispatch(final CompletedTask task) {
+ this.applicationDispatcher.onNext(CompletedTask.class, task);
+ }
+
+ public void dispatch(final FailedTask task) {
+ this.applicationDispatcher.onNext(FailedTask.class, task);
+ }
+
+ public void dispatch(final SuspendedTask task) {
+ this.applicationDispatcher.onNext(SuspendedTask.class, task);
+ }
+
+ public void dispatch(final TaskMessage message) {
+ this.applicationDispatcher.onNext(TaskMessage.class, message);
+ }
+
+ public void clientCloseDispatch() {
+ this.clientCloseDispatcher.onNext(Void.class, null);
+ }
+
+ public void clientCloseWithMessageDispatch(final byte[] message) {
+ this.clientCloseWithMessageDispatcher.onNext(byte[].class, message);
+ }
+
+ public void clientMessageDispatch(final byte[] message) {
+ this.clientMessageDispatcher.onNext(byte[].class, message);
+ }
+
+ public void dispatchAlarm(final String alarmId) {
+ this.alarmDispatcher.onNext(String.class, alarmId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java
new file mode 100644
index 0000000..a774b2f
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+
+import javax.inject.Inject;
+
+/**
+ * Driver Client evaluator requestor.
+ */
+public final class DriverClientEvaluatorRequestor implements EvaluatorRequestor {
+
+ private final IDriverServiceClient driverServiceClient;
+
+ @Inject
+ private DriverClientEvaluatorRequestor(final IDriverServiceClient driverServiceClient) {
+ this.driverServiceClient = driverServiceClient;
+ }
+
+ @Override
+ public void submit(final EvaluatorRequest req) {
+ this.driverServiceClient.onEvaluatorRequest(req);
+ }
+
+ @Override
+ public EvaluatorRequest.Builder newRequest() {
+ return new DriverClientEvaluatorRequestor.Builder();
+ }
+
+ /**
+ * {@link DriverClientEvaluatorRequestor.Builder} extended with a new submit method.
+ * {@link EvaluatorRequest}s are built using this builder.
+ */
+ public final class Builder extends EvaluatorRequest.Builder<DriverClientEvaluatorRequestor.Builder> {
+ @Override
+ public synchronized void submit() {
+ DriverClientEvaluatorRequestor.this.submit(this.build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java
new file mode 100644
index 0000000..d09ce41
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver client exception handler.
+ */
+public final class DriverClientExceptionHandler implements EventHandler<Throwable> {
+ private static final Logger LOG = Logger.getLogger(DriverClientExceptionHandler.class.getName());
+
+ @Inject
+ private DriverClientExceptionHandler() {
+ LOG.log(Level.FINE, "Instantiated 'DriverExceptionHandler'");
+ }
+
+
+ @Override
+ public void onNext(final Throwable throwable) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java
new file mode 100644
index 0000000..a9fee48
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Alarm dispatch handler.
+ */
+@DefaultImplementation(DriverClientClock.class)
+public interface IAlarmDispatchHandler extends EventHandler<String> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java
new file mode 100644
index 0000000..c71b554
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.bridge.client.grpc.DriverClientService;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+import java.io.IOException;
+
+/**
+ * Interface that driver client services implement.
+ */
+@DefaultImplementation(DriverClientService.class)
+public interface IDriverClientService {
+
+ /**
+ * Start the DriverClient service.
+ * @throws IOException when unable to start service
+ */
+ void start() throws IOException;
+
+
+ /**
+ * Wait for termination of driver client service.
+ */
+ void awaitTermination() throws InterruptedException;
+
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java
new file mode 100644
index 0000000..e1f8cb7
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.bridge.client.grpc.DriverServiceClient;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.util.Optional;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * Forwards application requests to driver server.
+ */
+@DefaultImplementation(DriverServiceClient.class)
+public interface IDriverServiceClient {
+
+ /**
+ * Initiate shutdown.
+ */
+ void onShutdown();
+
+ /**
+ * Initiate shutdown with error.
+ * @param ex exception error
+ */
+ void onShutdown(final Throwable ex);
+
+ /**
+ * Set alarm.
+ * @param alarmId alarm identifier
+ * @param timeoutMS timeout in milliseconds
+ */
+ void onSetAlarm(final String alarmId, final int timeoutMS);
+
+ /**
+ * Request evaluators.
+ * @param evaluatorRequest event
+ */
+ void onEvaluatorRequest(final EvaluatorRequest evaluatorRequest);
+
+ /**
+ * Close evaluator.
+ * @param evalautorId to close
+ */
+ void onEvaluatorClose(final String evalautorId);
+
+ /**
+ * Submit context and/or task.
+ * @param evaluatorId to submit against
+ * @param contextConfiguration context configuration
+ * @param taskConfiguration task configuration
+ * @param evaluatorProcess evaluator process
+ * @param addFileList to include
+ * @param addLibraryList to include
+ */
+ void onEvaluatorSubmit(
+ final String evaluatorId,
+ final Optional<Configuration> contextConfiguration,
+ final Optional<Configuration> taskConfiguration,
+ final Optional<JVMClientProcess> evaluatorProcess,
+ final Optional<List<File>> addFileList,
+ final Optional<List<File>> addLibraryList);
+
+ // Context Operations
+
+ /**
+ * Close context.
+ * @param contextId to close
+ */
+ void onContextClose(final String contextId);
+
+ /**
+ * Submit child context.
+ * @param contextId to submit against
+ * @param contextConfiguration for child context
+ */
+ void onContextSubmitContext(
+ final String contextId,
+ final Configuration contextConfiguration);
+
+ /**
+ * Submit task.
+ * @param contextId to submit against
+ * @param taskConfiguration for task
+ */
+ void onContextSubmitTask(
+ final String contextId,
+ final Configuration taskConfiguration);
+
+ /**
+ * Send message to context.
+ * @param contextId to destination context
+ * @param message to send
+ */
+ void onContextMessage(final String contextId, final byte[] message);
+
+ // Task operations
+
+ /**
+ * Close the task.
+ * @param taskId to close
+ * @param message optional message to include
+ */
+ void onTaskClose(final String taskId, final Optional<byte[]> message);
+
+ /**
+ * Send task a message.
+ * @param taskId of destination task
+ * @param message to send
+ */
+ void onTaskMessage(final String taskId, final byte[] message);
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java
new file mode 100644
index 0000000..cdcb9b5
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.evaluator.EvaluatorProcess;
+import org.apache.reef.driver.evaluator.EvaluatorType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Stub class for Evaluator Process on driver client.
+ */
+@Private
+public final class JVMClientProcess implements EvaluatorProcess {
+
+ private boolean optionSet = false;
+
+ private int megaBytes = 0;
+
+ private String configurationFileName = null;
+
+ private String standardOut = null;
+
+ private String standardErr = null;
+
+ private final List<String> optionList = new ArrayList<>();
+
+ public JVMClientProcess() {
+ }
+
+ @Override
+ public List<String> getCommandLine() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public EvaluatorType getType() {
+ return EvaluatorType.JVM;
+ }
+
+ @Override
+ public JVMClientProcess setMemory(final int mb) {
+ this.megaBytes = mb;
+ this.optionSet = true;
+ return this;
+ }
+
+ public int getMemory() {
+ return this.megaBytes;
+ }
+
+ @Override
+ public boolean isOptionSet() {
+ return optionSet;
+ }
+
+ @Override
+ public JVMClientProcess setConfigurationFileName(final String configurationFileName) {
+ this.configurationFileName = configurationFileName;
+ return this;
+ }
+
+ public String getConfigurationFileName() {
+ return this.configurationFileName;
+ }
+
+ @Override
+ public JVMClientProcess setStandardOut(final String standardOut) {
+ this.standardOut = standardOut;
+ return this;
+ }
+
+ public String getStandardOut() {
+ return this.standardOut;
+ }
+
+ @Override
+ public JVMClientProcess setStandardErr(final String standardErr) {
+ this.standardErr = standardErr;
+ return this;
+ }
+
+ public String getStandardErr() {
+ return this.standardErr;
+ }
+
+ /**
+ * Add a JVM option.
+ * @param option The full option, e.g. "-XX:+PrintGCDetails", "-Xms500m"
+ * @return this
+ */
+ public JVMClientProcess addOption(final String option) {
+ this.optionList.add(option);
+ optionSet = true;
+ return this;
+ }
+
+ public List<String> getOptions() {
+ return this.optionList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java
new file mode 100644
index 0000000..a7bf37c
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.bridge.client.grpc.DriverClientGrpcConfiguration;
+import org.apache.reef.bridge.client.grpc.parameters.DriverServicePort;
+import org.apache.reef.runtime.common.REEFLauncher;
+import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
+import org.apache.reef.runtime.common.launch.REEFErrorHandler;
+import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler;
+import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.util.ThreadLogger;
+import org.apache.reef.util.logging.LoggingSetup;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+import org.apache.reef.wake.time.Clock;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver client launcher.
+ */
+public final class JavaDriverClientLauncher {
+
+ private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName());
+
+ private static final Tang TANG = Tang.Factory.getTang();
+
+ private static final Configuration LAUNCHER_STATIC_CONFIG =
+ TANG.newConfigurationBuilder()
+ .bindNamedParameter(RemoteConfiguration.ManagerName.class, "DRIVER_CLIENT_LAUNCHER")
+ .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
+ .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+ .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class)
+ .build();
+
+ static {
+ LoggingSetup.setupCommonsLogging();
+ }
+
+ /**
+ * Main configuration object of the REEF component we are launching here.
+ */
+ private final Configuration envConfig;
+
+ /**
+ * REEFLauncher is instantiated in the main() method below using
+ * Tang configuration file provided as a command line argument.
+ * @param configurationPath Path to the serialized Tang configuration file.
+ * (The file must be in the local file system).
+ * @param configurationSerializer Serializer used to read the configuration file.
+ * We currently use Avro to serialize Tang configs.
+ */
+ @Inject
+ private JavaDriverClientLauncher(
+ @Parameter(DriverServicePort.class) final Integer driverServicePort,
+ @Parameter(ClockConfigurationPath.class) final String configurationPath,
+ final ConfigurationSerializer configurationSerializer) {
+
+ this.envConfig = Configurations.merge(
+ LAUNCHER_STATIC_CONFIG,
+ DriverClientGrpcConfiguration.CONF
+ .set(DriverClientGrpcConfiguration.DRIVER_SERVICE_PORT, driverServicePort)
+ .build(),
+ readConfigurationFromDisk(configurationPath, configurationSerializer));
+ }
+
+ /**
+ * Instantiate REEF DriverServiceLauncher. This method is called from REEFLauncher.main().
+ * @param clockConfigPath Path to the local file that contains serialized configuration
+ * for the driver client.
+ * @return An instance of the configured REEFLauncher object.
+ */
+ private static JavaDriverClientLauncher getLauncher(final String clockConfigPath, final int driverServicePort) {
+
+ try {
+
+ final Configuration clockArgConfig = Configurations.merge(
+ LAUNCHER_STATIC_CONFIG,
+ DriverClientGrpcConfiguration.CONF
+ .set(DriverClientGrpcConfiguration.DRIVER_SERVICE_PORT, driverServicePort)
+ .build(),
+ TANG.newConfigurationBuilder()
+ .bindNamedParameter(ClockConfigurationPath.class, clockConfigPath)
+ .build());
+
+ return TANG.newInjector(clockArgConfig).getInstance(JavaDriverClientLauncher.class);
+
+ } catch (final BindException ex) {
+ throw fatal("Error in parsing the command line", ex);
+ } catch (final InjectionException ex) {
+ throw fatal("Unable to instantiate REEFLauncher.", ex);
+ }
+ }
+
+ /**
+ * Read configuration from a given file and deserialize it
+ * into Tang configuration object that can be used for injection.
+ * Configuration is currently serialized using Avro.
+ * This method also prints full deserialized configuration into log.
+ * @param configPath Path to the local file that contains serialized configuration
+ * of a REEF component to launch (can be either Driver or Evaluator).
+ * @param serializer An object to deserialize the configuration file.
+ * @return Tang configuration read and deserialized from a given file.
+ */
+ private static Configuration readConfigurationFromDisk(
+ final String configPath, final ConfigurationSerializer serializer) {
+
+ LOG.log(Level.FINER, "Loading configuration file: {0}", configPath);
+
+ final File evaluatorConfigFile = new File(configPath);
+
+ if (!evaluatorConfigFile.exists()) {
+ throw fatal(
+ "Configuration file " + configPath + " does not exist. Can be an issue in job submission.",
+ new FileNotFoundException(configPath));
+ }
+
+ if (!evaluatorConfigFile.canRead()) {
+ throw fatal(
+ "Configuration file " + configPath + " exists, but can't be read.",
+ new IOException(configPath));
+ }
+
+ try {
+
+ final Configuration config = serializer.fromFile(evaluatorConfigFile);
+ LOG.log(Level.FINEST, "The configuration file loaded: {0}", configPath);
+
+ return config;
+
+ } catch (final IOException e) {
+ throw fatal("Unable to parse the configuration file: " + configPath, e);
+ }
+ }
+
+ /**
+ * Launches a REEF client process (Driver or Evaluator).
+ * @param args Command-line arguments.
+ * Must be a single element containing local path to the configuration file.
+ */
+ @SuppressWarnings("checkstyle:illegalcatch")
+ public static void main(final String[] args) {
+
+ LOG.log(Level.INFO, "Entering JavaDriverClientLauncher.main().");
+
+ LOG.log(Level.FINE, "JavaDriverClientLauncher started with user name [{0}]", System.getProperty("user.name"));
+ LOG.log(Level.FINE, "JavaDriverClientLauncher started. Assertions are {0} in this process.",
+ EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED");
+
+ if (args.length != 2) {
+ final String message = "JavaDriverClientLauncher have two and only two arguments to specify the runtime clock " +
+ "configuration path and driver service port";
+
+ throw fatal(message, new IllegalArgumentException(message));
+ }
+
+ final JavaDriverClientLauncher launcher = getLauncher(args[0], Integer.parseInt(args[1]));
+
+ Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.envConfig));
+ final Injector injector = Tang.Factory.getTang().newInjector(launcher.envConfig);
+ try (final Clock reef = injector.getInstance(Clock.class)) {
+ reef.run();
+ } catch (final Throwable ex) {
+ throw fatal("Unable to configure and start Clock.", ex);
+ }
+
+ ThreadLogger.logThreads(LOG, Level.FINEST, "Threads running after Clock.close():");
+
+ LOG.log(Level.INFO, "Exiting REEFLauncher.main()");
+
+ System.exit(0); // TODO[REEF-1715]: Should be able to exit cleanly at the end of main()
+ }
+
+ /**
+ * Wrap an exception into RuntimeException with a given message,
+ * and write the same message and exception to the log.
+ * @param msg an error message to log and pass into the RuntimeException.
+ * @param t A Throwable exception to log and wrap.
+ * @return a new Runtime exception wrapping a Throwable.
+ */
+ private static RuntimeException fatal(final String msg, final Throwable t) {
+ LOG.log(Level.SEVERE, msg, t);
+ return new RuntimeException(msg, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java
new file mode 100644
index 0000000..54645a0
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client.events;
+
+import org.apache.reef.bridge.client.IDriverServiceClient;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.util.Optional;
+
+/**
+ * Active context bridge.
+ */
+public final class ActiveContextBridge implements ActiveContext {
+
+ private final IDriverServiceClient driverServiceClient;
+
+ private final String contextId;
+
+ private final Optional<String> parentId;
+
+ private final String evaluatorId;
+
+ private final EvaluatorDescriptor evaluatorDescriptor;
+
+ public ActiveContextBridge(
+ final IDriverServiceClient driverServiceClient,
+ final String contextId,
+ final Optional<String> parentId,
+ final String evaluatorId,
+ final EvaluatorDescriptor evaluatorDescriptor) {
+ this.driverServiceClient = driverServiceClient;
+ this.contextId = contextId;
+ this.parentId = parentId;
+ this.evaluatorId = evaluatorId;
+ this.evaluatorDescriptor = evaluatorDescriptor;
+ }
+
+ @Override
+ public void close() {
+ this.driverServiceClient.onContextClose(this.contextId);
+ }
+
+ @Override
+ public void submitTask(final Configuration taskConf) {
+ this.driverServiceClient.onContextSubmitTask(this.contextId, taskConf);
+ }
+
+ @Override
+ public void submitContext(final Configuration contextConfiguration) {
+ this.driverServiceClient.onContextSubmitContext(this.contextId, contextConfiguration);
+ }
+
+ @Override
+ public void submitContextAndService(
+ final Configuration contextConfiguration,
+ final Configuration serviceConfiguration) {
+ throw new UnsupportedOperationException("Service not supported");
+ }
+
+ @Override
+ public void sendMessage(final byte[] message) {
+ this.driverServiceClient.onContextMessage(this.contextId, message);
+ }
+
+ @Override
+ public String getEvaluatorId() {
+ return this.evaluatorId;
+ }
+
+ @Override
+ public Optional<String> getParentId() {
+ return this.parentId;
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return this.evaluatorDescriptor;
+ }
+
+ @Override
+ public String getId() {
+ return this.contextId;
+ }
+}