You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2018/05/01 20:36:38 UTC
[6/6] reef git commit: [REEF-2003] Revise Driver Service to allow
static configuration.
[REEF-2003] Revise Driver Service to allow static configuration.
The driver service structure has been revised to allow easy static
configurations that do not involve Java on the client.
JIRA:
[REEF-2003](https://issues.apache.org/jira/browse/REEF-2003)
Pull Request:
Closes #
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d243aa2a
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d243aa2a
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d243aa2a
Branch: refs/heads/REEF-335
Commit: d243aa2a3cb89d03ee216c56c3dafbcea68eae89
Parents: 386069e
Author: Tyson Condie <tc...@apache.org>
Authored: Wed Apr 25 09:38:17 2018 -0700
Committer: Doug Service <do...@apache.org>
Committed: Tue May 1 01:28:25 2018 +0000
----------------------------------------------------------------------
lang/common/proto/bridge/ClientProtocol.proto | 41 +-
.../proto/bridge/DriverClientProtocol.proto | 42 +-
lang/java/reef-bridge-proto-java/pom.xml | 17 +
.../client/DefaultDriverClientStopHandler.java | 43 -
.../reef/bridge/client/DriverClientClock.java | 127 ---
.../client/DriverClientConfiguration.java | 202 ----
.../bridge/client/DriverClientDispatcher.java | 231 -----
.../client/DriverClientEvaluatorRequestor.java | 59 --
.../client/DriverClientExceptionHandler.java | 43 -
.../bridge/client/DriverServiceLauncher.java | 193 ++++
.../bridge/client/IAlarmDispatchHandler.java | 30 -
.../IDriverBridgeConfigurationProvider.java | 32 +
.../bridge/client/IDriverClientService.java | 45 -
.../IDriverRuntimeConfigurationProvider.java | 30 +
.../bridge/client/IDriverServiceClient.java | 132 ---
.../client/IDriverServiceRuntimeLauncher.java | 30 +
.../reef/bridge/client/JVMClientProcess.java | 121 ---
.../bridge/client/JavaDriverClientLauncher.java | 217 -----
.../client/WindowsRuntimePathProvider.java | 43 +
.../client/events/ActiveContextBridge.java | 102 --
.../client/events/AllocatedEvaluatorBridge.java | 166 ----
.../client/events/ClosedContextBridge.java | 77 --
.../client/events/CompletedEvaluatorBridge.java | 39 -
.../client/events/CompletedTaskBridge.java | 61 --
.../client/events/ContextMessageBridge.java | 69 --
.../client/events/FailedContextBridge.java | 110 ---
.../client/events/FailedEvaluatorBridge.java | 75 --
.../bridge/client/events/RunningTaskBridge.java | 90 --
.../bridge/client/events/TaskMessageBridge.java | 78 --
.../reef/bridge/client/events/package-info.java | 22 -
.../grpc/DriverClientGrpcConfiguration.java | 42 -
.../bridge/client/grpc/DriverClientService.java | 458 ---------
.../bridge/client/grpc/DriverServiceClient.java | 225 -----
.../reef/bridge/client/grpc/package-info.java | 22 -
.../grpc/parameters/DriverServicePort.java | 29 -
.../client/grpc/parameters/package-info.java | 22 -
.../LocalDriverServiceRuntimeLauncher.java | 57 ++
.../YarnDriverServiceRuntimeLauncher.java | 58 ++
.../reef/bridge/client/launch/package-info.java | 22 +
.../apache/reef/bridge/client/package-info.java | 2 +-
.../parameters/ClientDriverStopHandler.java | 36 -
.../DriverClientDispatchThreadCount.java | 30 -
.../bridge/client/parameters/package-info.java | 22 -
...LocalDriverRuntimeConfigurationProvider.java | 57 ++
.../YarnDriverRuntimeConfigurationProvider.java | 66 ++
.../bridge/client/runtime/package-info.java | 22 +
.../client/DefaultDriverClientStopHandler.java | 43 +
.../bridge/driver/client/DriverClientClock.java | 127 +++
.../client/DriverClientConfiguration.java | 202 ++++
.../driver/client/DriverClientDispatcher.java | 346 +++++++
.../client/DriverClientEvaluatorRequestor.java | 59 ++
.../client/DriverClientExceptionHandler.java | 43 +
.../driver/client/IAlarmDispatchHandler.java | 30 +
.../driver/client/IDriverClientService.java | 45 +
.../driver/client/IDriverServiceClient.java | 132 +++
.../bridge/driver/client/JVMClientProcess.java | 121 +++
.../driver/client/JavaDriverClientLauncher.java | 217 +++++
.../client/events/ActiveContextBridge.java | 111 +++
.../client/events/AllocatedEvaluatorBridge.java | 166 ++++
.../client/events/ClosedContextBridge.java | 77 ++
.../client/events/CompletedEvaluatorBridge.java | 39 +
.../client/events/CompletedTaskBridge.java | 61 ++
.../client/events/ContextMessageBridge.java | 69 ++
.../client/events/FailedContextBridge.java | 110 +++
.../client/events/FailedEvaluatorBridge.java | 75 ++
.../driver/client/events/RunningTaskBridge.java | 90 ++
.../driver/client/events/TaskMessageBridge.java | 78 ++
.../driver/client/events/package-info.java | 22 +
.../grpc/DriverClientGrpcConfiguration.java | 42 +
.../driver/client/grpc/DriverClientService.java | 611 ++++++++++++
.../driver/client/grpc/DriverServiceClient.java | 232 +++++
.../bridge/driver/client/grpc/package-info.java | 22 +
.../grpc/parameters/DriverServicePort.java | 29 +
.../client/grpc/parameters/package-info.java | 22 +
.../reef/bridge/driver/client/package-info.java | 22 +
.../parameters/ClientDriverStopHandler.java | 36 +
.../DriverClientDispatchThreadCount.java | 30 +
.../driver/client/parameters/package-info.java | 22 +
.../driver/service/DriverClientException.java | 30 +
.../service/DriverServiceConfiguration.java | 45 +
.../DriverServiceConfigurationProviderBase.java | 148 +++
.../driver/service/DriverServiceHandlers.java | 298 ++++++
.../bridge/driver/service/IDriverService.java | 170 ++++
.../IDriverServiceConfigurationProvider.java | 31 +
.../driver/service/grpc/GRPCDriverService.java | 919 +++++++++++++++++++
.../GRPCDriverServiceConfigurationProvider.java | 53 ++
.../driver/service/grpc/package-info.java | 22 +
.../bridge/driver/service/package-info.java | 22 +
.../service/parameters/DriverClientCommand.java | 31 +
.../driver/service/parameters/package-info.java | 22 +
.../examples/WindowsRuntimePathProvider.java | 43 -
.../reef/bridge/examples/hello/HelloDriver.java | 24 +
.../reef/bridge/examples/hello/HelloREEF.java | 16 +-
.../bridge/service/DriverClientException.java | 30 -
.../service/DriverServiceConfiguration.java | 47 -
.../bridge/service/DriverServiceHandlers.java | 236 -----
.../bridge/service/DriverServiceLauncher.java | 328 -------
.../reef/bridge/service/IDriverService.java | 138 ---
.../reef/bridge/service/RuntimeNames.java | 36 -
.../bridge/service/grpc/GRPCDriverService.java | 706 --------------
.../reef/bridge/service/grpc/package-info.java | 22 -
.../reef/bridge/service/package-info.java | 22 -
.../service/parameters/DriverClientCommand.java | 31 -
.../bridge/service/parameters/package-info.java | 22 -
104 files changed, 5828 insertions(+), 4732 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/common/proto/bridge/ClientProtocol.proto
----------------------------------------------------------------------
diff --git a/lang/common/proto/bridge/ClientProtocol.proto b/lang/common/proto/bridge/ClientProtocol.proto
index 68bdcaa..3962c00 100644
--- a/lang/common/proto/bridge/ClientProtocol.proto
+++ b/lang/common/proto/bridge/ClientProtocol.proto
@@ -19,8 +19,6 @@
syntax = "proto3";
-// option java_generic_services = true;
-// option java_multiple_files = true;
option java_package = "org.apache.reef.bridge.proto";
option java_outer_classname = "ClientProtocol";
option csharp_namespace = "Org.Apache.REEF.Bridge.Proto";
@@ -30,13 +28,15 @@ package driverbridge;
message LocalRuntimeParameters {
uint32 max_number_of_evaluators = 1;
string runtime_root_folder = 2;
- string jvm_heap_slack = 3;
+ float jvm_heap_slack = 3;
repeated string rack_names = 4;
}
message YarnRuntimeParameters {
string queue = 1;
- string job_submission_directory_prefix = 2;
+ string job_submission_directory = 2;
+ string filesystem_url = 3;
+ bytes security_token = 4;
}
message AzureBatchRuntimeParameters {
@@ -65,34 +65,11 @@ message DriverClientConfiguration {
// The command to launch the driver client
string driver_client_launch_command = 10;
- enum Handlers {
- // control events
- START = 0;
- STOP = 1;
-
- // evaluator events
- EVALUATOR_ALLOCATED = 5;
- EVALUATOR_COMPLETED = 6;
- EVALUATOR_FAILED = 7;
-
- // context events
- CONTEXT_ACTIVE = 10;
- CONTEXT_CLOSED = 11;
- CONTEXT_FAILED = 12;
- CONTEXT_MESSAGE = 13;
-
- // task events
- TASK_RUNNING = 15;
- TASK_FAILED = 16;
- TASK_COMPLETED = 17;
- TASK_MESSAGE = 18;
-
- // client events
- CLIENT_MESSAGE = 20;
- CLIENT_CLOSE = 21;
- CLIENT_CLOSE_WITH_MESSAGE = 22;
- }
- repeated Handlers handler = 11;
+ // Enable driver restart?
+ bool driver_restart_enable = 11;
+
+ // Driver restart evaluator recovery seconds (optional)
+ uint32 driver_restart_evaluator_recovery_seconds = 12;
// TCP port range
uint32 tcp_port_range_begin = 15;
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/common/proto/bridge/DriverClientProtocol.proto
----------------------------------------------------------------------
diff --git a/lang/common/proto/bridge/DriverClientProtocol.proto b/lang/common/proto/bridge/DriverClientProtocol.proto
index f80cff2..6d0d08b 100644
--- a/lang/common/proto/bridge/DriverClientProtocol.proto
+++ b/lang/common/proto/bridge/DriverClientProtocol.proto
@@ -74,6 +74,33 @@ service DriverClient {
rpc ClientCloseHandler (Void) returns (Void) {}
rpc ClientCloseWithMessageHandler (ClientMessageInfo) returns (Void) {}
+
+ // Driver Restart Handlers
+ rpc DriverRestartHandler (DriverRestartInfo) returns (Void) {}
+
+ rpc DriverRestartActiveContextHandler (ContextInfo) returns (Void) {}
+
+ rpc DriverRestartRunningTaskHandler (TaskInfo) returns (Void) {}
+
+ rpc DriverRestartCompletedHandler (DriverRestartCompletedInfo) returns (Void) {}
+
+ rpc DriverRestartFailedEvaluatorHandler (EvaluatorInfo) returns (Void) {}
+}
+
+// Driver restart information
+message DriverRestartInfo {
+ uint32 resubmission_attempts = 1;
+
+ StartTimeInfo start_time = 2;
+
+ repeated string expected_evaluator_ids = 3;
+}
+
+// Driver restart completed information
+message DriverRestartCompletedInfo {
+ StopTimeInfo completion_time = 1;
+
+ bool is_timed_out = 2;
}
// IdleStatus response to idleness inquiry
@@ -127,6 +154,9 @@ message ContextInfo {
string parent_id = 3;
+ // Carry this with us for driver restart
+ EvaluatorDescriptorInfo evaluator_descriptor_info = 4;
+
// Optional exception information
ExceptionInfo exception = 5;
}
@@ -142,13 +172,19 @@ message ContextMessageInfo {
}
message TaskInfo {
+ // Task identifier.
string task_id = 1;
- string context_id = 2;
+ // Task result.
+ bytes result = 2;
- bytes result = 3;
+ /* Carry entire context info since client may not have received it
+ * when submitting task against allocated evalautor.
+ */
+ ContextInfo context = 5;
- ExceptionInfo exception = 5;
+ // Possible exception encountered in task execution.
+ ExceptionInfo exception = 10;
}
message TaskMessageInfo {
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/pom.xml b/lang/java/reef-bridge-proto-java/pom.xml
index f177b7c..a1ccc33 100644
--- a/lang/java/reef-bridge-proto-java/pom.xml
+++ b/lang/java/reef-bridge-proto-java/pom.xml
@@ -92,6 +92,23 @@ under the License.
<artifactId>reef-runtime-yarn</artifactId>
<version>0.17.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>20.0</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion> <!-- declare the exclusion here -->
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java
deleted file mode 100644
index 8636f7a..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.client;
-
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StopTime;
-
-import javax.inject.Inject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Default java client driver stop handler.
- */
-public final class DefaultDriverClientStopHandler implements EventHandler<StopTime> {
-
- private static final Logger LOG = Logger.getLogger(DefaultDriverClientStopHandler.class.getName());
-
- @Inject
- private DefaultDriverClientStopHandler() {}
-
- @Override
- public void onNext(final StopTime value) {
- LOG.log(Level.FINEST, "Stop time {0}", value);
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java
deleted file mode 100644
index 162cbe5..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.client;
-
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.Clock;
-import org.apache.reef.wake.time.Time;
-import org.apache.reef.wake.time.event.Alarm;
-import org.apache.reef.wake.time.runtime.Timer;
-import org.apache.reef.wake.time.runtime.event.ClientAlarm;
-
-import javax.inject.Inject;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * The bridge driver client clock.
- */
-public final class DriverClientClock implements Clock, IAlarmDispatchHandler {
-
- private static final Logger LOG = Logger.getLogger(DriverClientClock.class.getName());
-
- private final IDriverClientService driverClientService;
-
- private final IDriverServiceClient driverServiceClient;
-
- private final Timer timer;
-
- private final Map<String, ClientAlarm> alarmMap = new HashMap<>();
-
- private boolean closed = false;
-
- @Inject
- private DriverClientClock(
- final Timer timer,
- final IDriverClientService driverClientService,
- final IDriverServiceClient driverServiceClient) {
- this.timer = timer;
- this.driverClientService = driverClientService;
- this.driverServiceClient = driverServiceClient;
- }
-
- @Override
- public Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler) {
- final ClientAlarm alarm = new ClientAlarm(this.timer.getCurrent() + offset, handler);
- final String alarmId = UUID.randomUUID().toString();
- this.alarmMap.put(alarmId, alarm);
- this.driverServiceClient.onSetAlarm(alarmId, offset);
- return alarm;
- }
-
- @Override
- public void close() {
- stop();
- }
-
- @Override
- public void stop() {
- if (!closed) {
- this.closed = true;
- this.driverServiceClient.onShutdown();
- }
- }
-
- @Override
- public void stop(final Throwable exception) {
- if (!closed) {
- this.closed = true;
- this.driverServiceClient.onShutdown(exception);
- }
- }
-
- @Override
- public boolean isIdle() {
- return this.closed && this.alarmMap.isEmpty();
- }
-
- @Override
- public boolean isClosed() {
- return this.closed;
- }
-
- @Override
- public void run() {
- try {
- this.driverClientService.start();
- this.driverClientService.awaitTermination();
- } catch (IOException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Alarm clock event handler.
- * @param alarmId alarm identifier
- */
- @Override
- public void onNext(final String alarmId) {
- if (this.alarmMap.containsKey(alarmId)) {
- final ClientAlarm clientAlarm = this.alarmMap.remove(alarmId);
- clientAlarm.run();
- } else {
- LOG.log(Level.SEVERE, "Unknown alarm id {0}", alarmId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java
deleted file mode 100644
index 50da3ce..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.client;
-
-import org.apache.reef.bridge.client.parameters.DriverClientDispatchThreadCount;
-import org.apache.reef.bridge.client.parameters.ClientDriverStopHandler;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.context.ClosedContext;
-import org.apache.reef.driver.context.ContextMessage;
-import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.CompletedEvaluator;
-import org.apache.reef.driver.evaluator.EvaluatorRequestor;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.driver.parameters.*;
-import org.apache.reef.driver.task.*;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
-import org.apache.reef.tang.formats.OptionalImpl;
-import org.apache.reef.tang.formats.RequiredImpl;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.Clock;
-import org.apache.reef.wake.time.event.StartTime;
-import org.apache.reef.wake.time.event.StopTime;
-
-/**
- * Driver client configuration.
- */
-public final class DriverClientConfiguration extends ConfigurationModuleBuilder {
-
- /**
- * The event handler invoked right after the driver boots up.
- */
- public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>();
-
- /**
- * The event handler invoked right before the driver shuts down. Defaults to ignore.
- */
- public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>();
-
- // ***** EVALUATOR HANDLER BINDINGS:
-
- /**
- * Event handler for allocated evaluators. Defaults to returning the evaluator if not bound.
- */
- public static final OptionalImpl<EventHandler<AllocatedEvaluator>> ON_EVALUATOR_ALLOCATED = new OptionalImpl<>();
-
- /**
- * Event handler for completed evaluators. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<CompletedEvaluator>> ON_EVALUATOR_COMPLETED = new OptionalImpl<>();
-
- /**
- * Event handler for failed evaluators. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_EVALUATOR_FAILED = new OptionalImpl<>();
-
- // ***** TASK HANDLER BINDINGS:
-
- /**
- * Event handler for task messages. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<TaskMessage>> ON_TASK_MESSAGE = new OptionalImpl<>();
-
- /**
- * Event handler for completed tasks. Defaults to closing the context the task ran on if not bound.
- */
- public static final OptionalImpl<EventHandler<CompletedTask>> ON_TASK_COMPLETED = new OptionalImpl<>();
-
- /**
- * Event handler for failed tasks. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<FailedTask>> ON_TASK_FAILED = new OptionalImpl<>();
-
- /**
- * Event handler for running tasks. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>();
-
- /**
- * Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support
- * task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then.
- */
- public static final OptionalImpl<EventHandler<SuspendedTask>> ON_TASK_SUSPENDED = new OptionalImpl<>();
-
- // ***** CLIENT HANDLER BINDINGS:
-
- /**
- * Event handler for client messages. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_MESSAGE = new OptionalImpl<>();
-
- /**
- * Event handler for close messages sent by the client. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<Void>> ON_CLIENT_CLOSED = new OptionalImpl<>();
-
- /**
- * Event handler for close messages sent by the client. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_CLOSED_MESSAGE = new OptionalImpl<>();
-
- // ***** CONTEXT HANDLER BINDINGS:
-
- /**
- * Event handler for active context. Defaults to closing the context if not bound.
- */
- public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>();
-
- /**
- * Event handler for closed context. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>();
-
- /**
- * Event handler for closed context. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<FailedContext>> ON_CONTEXT_FAILED = new OptionalImpl<>();
-
- /**
- * Event handler for context messages. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>();
-
- /**
- * Number of dispatch threads to use.
- */
- public static final OptionalImpl<Integer> CLIENT_DRIVER_DISPATCH_THREAD_COUNT = new OptionalImpl<>();
-
- /**
- * Alarm dispatch handler.
- */
- public static final OptionalImpl<IAlarmDispatchHandler> ALARM_DISPATCH_HANDLER = new OptionalImpl<>();
-
- /**
- * Default to gRPC Driver Client Service.
- */
- public static final OptionalImpl<IDriverClientService> DRIVER_CLIENT_SERVICE = new OptionalImpl<>();
-
- /**
- * Default to gRPC Driver Service Client.
- */
- public static final OptionalImpl<IDriverServiceClient> DRIVER_SERVICE_CLIENT = new OptionalImpl<>();
-
- /**
- * ConfigurationModule to fill out to get a legal Driver Configuration.
- */
- public static final ConfigurationModule CONF = new DriverClientConfiguration()
- .bindImplementation(Clock.class, DriverClientClock.class)
- .bindImplementation(EvaluatorRequestor.class, DriverClientEvaluatorRequestor.class)
- .bindImplementation(IAlarmDispatchHandler.class, ALARM_DISPATCH_HANDLER)
- .bindImplementation(IDriverClientService.class, DRIVER_CLIENT_SERVICE)
- .bindImplementation(IDriverServiceClient.class, DRIVER_SERVICE_CLIENT)
-
- .bindNamedParameter(DriverClientDispatchThreadCount.class, CLIENT_DRIVER_DISPATCH_THREAD_COUNT)
-
- // Driver start/stop handlers
- .bindSetEntry(DriverStartHandler.class, ON_DRIVER_STARTED)
- .bindSetEntry(ClientDriverStopHandler.class, ON_DRIVER_STOP)
-
- // Evaluator handlers
- .bindSetEntry(EvaluatorAllocatedHandlers.class, ON_EVALUATOR_ALLOCATED)
- .bindSetEntry(EvaluatorCompletedHandlers.class, ON_EVALUATOR_COMPLETED)
- .bindSetEntry(EvaluatorFailedHandlers.class, ON_EVALUATOR_FAILED)
-
- // Task handlers
- .bindSetEntry(TaskRunningHandlers.class, ON_TASK_RUNNING)
- .bindSetEntry(TaskFailedHandlers.class, ON_TASK_FAILED)
- .bindSetEntry(TaskMessageHandlers.class, ON_TASK_MESSAGE)
- .bindSetEntry(TaskCompletedHandlers.class, ON_TASK_COMPLETED)
- .bindSetEntry(TaskSuspendedHandlers.class, ON_TASK_SUSPENDED)
-
- // Context handlers
- .bindSetEntry(ContextActiveHandlers.class, ON_CONTEXT_ACTIVE)
- .bindSetEntry(ContextClosedHandlers.class, ON_CONTEXT_CLOSED)
- .bindSetEntry(ContextMessageHandlers.class, ON_CONTEXT_MESSAGE)
- .bindSetEntry(ContextFailedHandlers.class, ON_CONTEXT_FAILED)
-
- // Client handlers
- .bindSetEntry(ClientMessageHandlers.class, ON_CLIENT_MESSAGE)
- .bindSetEntry(ClientCloseHandlers.class, ON_CLIENT_CLOSED)
- .bindSetEntry(ClientCloseWithMessageHandlers.class, ON_CLIENT_CLOSED_MESSAGE)
-
- .build();
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java
deleted file mode 100644
index 3dd9b88..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.client;
-
-import com.google.common.collect.Sets;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.bridge.client.parameters.DriverClientDispatchThreadCount;
-import org.apache.reef.bridge.client.parameters.ClientDriverStopHandler;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.context.ClosedContext;
-import org.apache.reef.driver.context.ContextMessage;
-import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.CompletedEvaluator;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.driver.parameters.*;
-import org.apache.reef.driver.task.*;
-import org.apache.reef.runtime.common.utils.DispatchingEStage;
-import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StartTime;
-import org.apache.reef.wake.time.event.StopTime;
-
-import javax.inject.Inject;
-import java.util.Set;
-
-/**
- * Async dispatch of client driver events.
- */
-@Private
-public final class DriverClientDispatcher {
-
- /**
- * Dispatcher used for application provided event handlers.
- */
- private final DispatchingEStage applicationDispatcher;
-
- /**
- * Dispatcher for client close events.
- */
- private final DispatchingEStage clientCloseDispatcher;
-
- /**
- * Dispatcher for client close with message events.
- */
- private final DispatchingEStage clientCloseWithMessageDispatcher;
-
- /**
- * Dispatcher for client messages.
- */
- private final DispatchingEStage clientMessageDispatcher;
-
- /**
- * The alarm dispatcher.
- */
- private final DispatchingEStage alarmDispatcher;
-
- @Inject
- private DriverClientDispatcher(
- final DriverClientExceptionHandler driverExceptionHandler,
- final IAlarmDispatchHandler alarmDispatchHandler,
- @Parameter(DriverClientDispatchThreadCount.class)
- final Integer numberOfThreads,
- // Application-provided start and stop handlers
- @Parameter(DriverStartHandler.class)
- final Set<EventHandler<StartTime>> startHandlers,
- @Parameter(ClientDriverStopHandler.class)
- final Set<EventHandler<StopTime>> stopHandlers,
- // Application-provided Context event handlers
- @Parameter(ContextActiveHandlers.class)
- final Set<EventHandler<ActiveContext>> contextActiveHandlers,
- @Parameter(ContextClosedHandlers.class)
- final Set<EventHandler<ClosedContext>> contextClosedHandlers,
- @Parameter(ContextFailedHandlers.class)
- final Set<EventHandler<FailedContext>> contextFailedHandlers,
- @Parameter(ContextMessageHandlers.class)
- final Set<EventHandler<ContextMessage>> contextMessageHandlers,
- // Application-provided Task event handlers
- @Parameter(TaskRunningHandlers.class)
- final Set<EventHandler<RunningTask>> taskRunningHandlers,
- @Parameter(TaskCompletedHandlers.class)
- final Set<EventHandler<CompletedTask>> taskCompletedHandlers,
- @Parameter(TaskSuspendedHandlers.class)
- final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
- @Parameter(TaskMessageHandlers.class)
- final Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
- @Parameter(TaskFailedHandlers.class)
- final Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
- // Application-provided Evaluator event handlers
- @Parameter(EvaluatorAllocatedHandlers.class)
- final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers,
- @Parameter(EvaluatorFailedHandlers.class)
- final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
- @Parameter(EvaluatorCompletedHandlers.class)
- final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
- // Client handlers
- @Parameter(ClientCloseHandlers.class)
- final Set<EventHandler<Void>> clientCloseHandlers,
- @Parameter(ClientCloseWithMessageHandlers.class)
- final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers,
- @Parameter(ClientMessageHandlers.class)
- final Set<EventHandler<byte[]>> clientMessageHandlers) {
-
- this.applicationDispatcher = new DispatchingEStage(
- driverExceptionHandler, numberOfThreads, "ClientDriverDispatcher");
- // Application start and stop handlers
- this.applicationDispatcher.register(StartTime.class, startHandlers);
- this.applicationDispatcher.register(StopTime.class, stopHandlers);
- // Application Context event handlers
- this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers);
- this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers);
- this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers);
- this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers);
-
- // Application Task event handlers.
- this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers);
- this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers);
- this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers);
- this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers);
- this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers);
-
- // Application Evaluator event handlers
- this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers);
- this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedHandlers);
- this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedHandlers);
-
- // Client event handlers;
- this.clientCloseDispatcher = new DispatchingEStage(this.applicationDispatcher);
- this.clientCloseDispatcher.register(Void.class, clientCloseHandlers);
-
- this.clientCloseWithMessageDispatcher = new DispatchingEStage(this.applicationDispatcher);
- this.clientCloseWithMessageDispatcher.register(byte[].class, clientCloseWithMessageHandlers);
-
- this.clientMessageDispatcher = new DispatchingEStage(this.applicationDispatcher);
- this.clientMessageDispatcher.register(byte[].class, clientMessageHandlers);
-
- // Alarm event handlers
- this.alarmDispatcher = new DispatchingEStage(this.applicationDispatcher);
- this.alarmDispatcher.register(String.class,
- Sets.newHashSet((EventHandler<String>)alarmDispatchHandler));
- }
-
- public void dispatch(final StartTime startTime) {
- this.applicationDispatcher.onNext(StartTime.class, startTime);
- }
-
- public void dispatch(final StopTime stopTime) {
- this.applicationDispatcher.onNext(StopTime.class, stopTime);
- }
-
- public void dispatch(final ActiveContext context) {
- this.applicationDispatcher.onNext(ActiveContext.class, context);
- }
-
- public void dispatch(final ClosedContext context) {
- this.applicationDispatcher.onNext(ClosedContext.class, context);
- }
-
- public void dispatch(final FailedContext context) {
- this.applicationDispatcher.onNext(FailedContext.class, context);
- }
-
- public void dispatch(final ContextMessage message) {
- this.applicationDispatcher.onNext(ContextMessage.class, message);
- }
-
- public void dispatch(final AllocatedEvaluator evaluator) {
- this.applicationDispatcher.onNext(AllocatedEvaluator.class, evaluator);
- }
-
- public void dispatch(final FailedEvaluator evaluator) {
- this.applicationDispatcher.onNext(FailedEvaluator.class, evaluator);
- }
-
- public void dispatch(final CompletedEvaluator evaluator) {
- this.applicationDispatcher.onNext(CompletedEvaluator.class, evaluator);
- }
-
- public void dispatch(final RunningTask task) {
- this.applicationDispatcher.onNext(RunningTask.class, task);
- }
-
- public void dispatch(final CompletedTask task) {
- this.applicationDispatcher.onNext(CompletedTask.class, task);
- }
-
- public void dispatch(final FailedTask task) {
- this.applicationDispatcher.onNext(FailedTask.class, task);
- }
-
- public void dispatch(final SuspendedTask task) {
- this.applicationDispatcher.onNext(SuspendedTask.class, task);
- }
-
- public void dispatch(final TaskMessage message) {
- this.applicationDispatcher.onNext(TaskMessage.class, message);
- }
-
- public void clientCloseDispatch() {
- this.clientCloseDispatcher.onNext(Void.class, null);
- }
-
- public void clientCloseWithMessageDispatch(final byte[] message) {
- this.clientCloseWithMessageDispatcher.onNext(byte[].class, message);
- }
-
- public void clientMessageDispatch(final byte[] message) {
- this.clientMessageDispatcher.onNext(byte[].class, message);
- }
-
- public void dispatchAlarm(final String alarmId) {
- this.alarmDispatcher.onNext(String.class, alarmId);
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java
deleted file mode 100644
index a774b2f..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.client;
-
-import org.apache.reef.driver.evaluator.EvaluatorRequest;
-import org.apache.reef.driver.evaluator.EvaluatorRequestor;
-
-import javax.inject.Inject;
-
-/**
- * Driver Client evaluator requestor.
- */
-public final class DriverClientEvaluatorRequestor implements EvaluatorRequestor {
-
- private final IDriverServiceClient driverServiceClient;
-
- @Inject
- private DriverClientEvaluatorRequestor(final IDriverServiceClient driverServiceClient) {
- this.driverServiceClient = driverServiceClient;
- }
-
- @Override
- public void submit(final EvaluatorRequest req) {
- this.driverServiceClient.onEvaluatorRequest(req);
- }
-
- @Override
- public EvaluatorRequest.Builder newRequest() {
- return new DriverClientEvaluatorRequestor.Builder();
- }
-
- /**
- * {@link DriverClientEvaluatorRequestor.Builder} extended with a new submit method.
- * {@link EvaluatorRequest}s are built using this builder.
- */
- public final class Builder extends EvaluatorRequest.Builder<DriverClientEvaluatorRequestor.Builder> {
- @Override
- public synchronized void submit() {
- DriverClientEvaluatorRequestor.this.submit(this.build());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java
deleted file mode 100644
index d09ce41..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.client;
-
-import org.apache.reef.wake.EventHandler;
-
-import javax.inject.Inject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Driver client exception handler.
- */
-public final class DriverClientExceptionHandler implements EventHandler<Throwable> {
- private static final Logger LOG = Logger.getLogger(DriverClientExceptionHandler.class.getName());
-
- @Inject
- private DriverClientExceptionHandler() {
- LOG.log(Level.FINE, "Instantiated 'DriverExceptionHandler'");
- }
-
-
- @Override
- public void onNext(final Throwable throwable) {
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java
new file mode 100644
index 0000000..0f3567c
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import com.google.protobuf.util.JsonFormat;
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.bridge.client.launch.LocalDriverServiceRuntimeLauncher;
+import org.apache.reef.bridge.client.launch.YarnDriverServiceRuntimeLauncher;
+import org.apache.reef.bridge.client.runtime.LocalDriverRuntimeConfigurationProvider;
+import org.apache.reef.bridge.client.runtime.YarnDriverRuntimeConfigurationProvider;
+import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider;
+import org.apache.reef.bridge.driver.service.grpc.GRPCDriverServiceConfigurationProvider;
+import org.apache.reef.bridge.driver.client.JavaDriverClientLauncher;
+import org.apache.reef.bridge.proto.ClientProtocol;
+import org.apache.reef.runtime.common.files.*;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.local.LocalClasspathProvider;
+import org.apache.reef.runtime.yarn.YarnClasspathProvider;
+import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.OSUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver Service Launcher - main class.
+ */
+public final class DriverServiceLauncher {
+
+ /**
+ * Standard Java logger.
+ */
+ private static final Logger LOG = Logger.getLogger(DriverServiceLauncher.class.getName());
+
+ /**
+ * This class should not be instantiated.
+ */
+ private DriverServiceLauncher() {
+ throw new RuntimeException("Do not instantiate this class!");
+ }
+
+ public static void submit(
+ final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto,
+ final Configuration driverClientConfiguration)
+ throws InjectionException, IOException {
+ ClientProtocol.DriverClientConfiguration.Builder builder =
+ ClientProtocol.DriverClientConfiguration.newBuilder(driverClientConfigurationProto);
+ final File driverClientConfigurationFile = new File("driverclient.conf");
+ try {
+ // Write driver client configuration to a file
+ final Injector driverClientInjector = Tang.Factory.getTang().newInjector(driverClientConfiguration);
+ final ConfigurationSerializer configurationSerializer =
+ driverClientInjector.getInstance(ConfigurationSerializer.class);
+ configurationSerializer.toFile(driverClientConfiguration, driverClientConfigurationFile);
+
+ // Resolve OS Runtime Path Provider.
+ final Configuration runtimeOSConfiguration =
+ driverClientConfigurationProto.getRuntimeCase() ==
+ ClientProtocol.DriverClientConfiguration.RuntimeCase.YARN_RUNTIME ?
+ Tang.Factory.getTang().newConfigurationBuilder()
+ .bind(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+ .build() :
+ Tang.Factory.getTang().newConfigurationBuilder()
+ .bind(RuntimePathProvider.class,
+ OSUtils.isWindows() ? WindowsRuntimePathProvider.class : UnixJVMPathProvider.class)
+ .bind(RuntimeClasspathProvider.class, LocalClasspathProvider.class)
+ .build();
+ final Injector runtimeInjector = Tang.Factory.getTang().newInjector(runtimeOSConfiguration);
+ final REEFFileNames fileNames = runtimeInjector.getInstance(REEFFileNames.class);
+ final ClasspathProvider classpathProvider = runtimeInjector.getInstance(ClasspathProvider.class);
+ final RuntimePathProvider runtimePathProvider = runtimeInjector.getInstance(RuntimePathProvider.class);
+ final List<String> launchCommand = new JavaLaunchCommandBuilder(JavaDriverClientLauncher.class, null)
+ .setConfigurationFilePaths(
+ Collections.singletonList("./" + fileNames.getLocalFolderPath() + "/" +
+ driverClientConfigurationFile.getName()))
+ .setJavaPath(runtimePathProvider.getPath())
+ .setClassPath(classpathProvider.getEvaluatorClasspath())
+ .build();
+ final String cmd = StringUtils.join(launchCommand, ' ');
+ builder.setDriverClientLaunchCommand(cmd);
+ builder.addLocalFiles(driverClientConfigurationFile.getAbsolutePath());
+
+ // call main()
+ final File driverClientConfFile = new File("driverclient.json");
+ try {
+ try (PrintWriter out = new PrintWriter(driverClientConfFile)) {
+ out.println(JsonFormat.printer().print(builder.build()));
+ }
+ main(new String[]{driverClientConfFile.getAbsolutePath()});
+ } finally {
+ driverClientConfFile.delete();
+ }
+ } finally {
+ driverClientConfigurationFile.delete();
+ }
+ }
+
+ private static IDriverServiceRuntimeLauncher getLocalDriverServiceLauncher() throws InjectionException {
+ final Configuration localJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindImplementation(IDriverRuntimeConfigurationProvider.class,
+ LocalDriverRuntimeConfigurationProvider.class)
+ .bindImplementation(IDriverServiceConfigurationProvider.class,
+ GRPCDriverServiceConfigurationProvider.class)
+ .bindImplementation(RuntimeClasspathProvider.class, LocalClasspathProvider.class)
+ .build();
+ return Tang.Factory.getTang()
+ .newInjector(localJobSubmissionClientConfig).getInstance(LocalDriverServiceRuntimeLauncher.class);
+ }
+
+
+ private static IDriverServiceRuntimeLauncher getYarnDriverServiceLauncher() throws InjectionException {
+ final Configuration yarnJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindImplementation(IDriverRuntimeConfigurationProvider.class,
+ YarnDriverRuntimeConfigurationProvider.class)
+ .bindImplementation(IDriverServiceConfigurationProvider.class,
+ GRPCDriverServiceConfigurationProvider.class)
+ .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+ .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class)
+ .build();
+ return Tang.Factory.getTang()
+ .newInjector(yarnJobSubmissionClientConfig).getInstance(YarnDriverServiceRuntimeLauncher.class);
+ }
+
+ /**
+ * Main method that launches the REEF job.
+ *
+ * @param args command line parameters.
+ */
+ public static void main(final String[] args) {
+ try {
+ if (args.length != 1) {
+ LOG.log(Level.SEVERE, DriverServiceLauncher.class.getName() +
+ " accepts single argument referencing a file that contains a client protocol buffer driver configuration");
+ }
+ final String content;
+ try {
+ content = new String(Files.readAllBytes(Paths.get(args[0])));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ final ClientProtocol.DriverClientConfiguration.Builder driverClientConfigurationProtoBuilder =
+ ClientProtocol.DriverClientConfiguration.newBuilder();
+ JsonFormat.parser()
+ .usingTypeRegistry(JsonFormat.TypeRegistry.getEmptyTypeRegistry())
+ .merge(content, driverClientConfigurationProtoBuilder);
+ final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto =
+ driverClientConfigurationProtoBuilder.build();
+ switch (driverClientConfigurationProto.getRuntimeCase()) {
+ case YARN_RUNTIME:
+ final IDriverServiceRuntimeLauncher yarnDriverServiceLauncher = getYarnDriverServiceLauncher();
+ yarnDriverServiceLauncher.launch(driverClientConfigurationProto);
+ break;
+ case LOCAL_RUNTIME:
+ final IDriverServiceRuntimeLauncher localDriverServiceLauncher = getLocalDriverServiceLauncher();
+ localDriverServiceLauncher.launch(driverClientConfigurationProto);
+ break;
+ default:
+ }
+ LOG.log(Level.INFO, "JavaBridge: Stop Client {0}", driverClientConfigurationProto.getJobid());
+ } catch (final BindException | InjectionException | IOException ex) {
+ LOG.log(Level.SEVERE, "Job configuration error", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java
deleted file mode 100644
index a9fee48..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.client;
-
-import org.apache.reef.tang.annotations.DefaultImplementation;
-import org.apache.reef.wake.EventHandler;
-
-/**
- * Alarm dispatch handler.
- */
-@DefaultImplementation(DriverClientClock.class)
-public interface IAlarmDispatchHandler extends EventHandler<String> {
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverBridgeConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverBridgeConfigurationProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverBridgeConfigurationProvider.java
new file mode 100644
index 0000000..c9fefb0
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverBridgeConfigurationProvider.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.bridge.proto.ClientProtocol;
+import org.apache.reef.tang.Configuration;
+
+/**
+ * Driver bridge configuration provider.
+ */
+public interface IDriverBridgeConfigurationProvider {
+
+ Configuration getConfiguration(final ClientProtocol.DriverClientConfiguration driverConfiguration);
+
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java
deleted file mode 100644
index c71b554..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.client;
-
-import org.apache.reef.bridge.client.grpc.DriverClientService;
-import org.apache.reef.tang.annotations.DefaultImplementation;
-
-import java.io.IOException;
-
-/**
- * Interface that driver client services implement.
- */
-@DefaultImplementation(DriverClientService.class)
-public interface IDriverClientService {
-
- /**
- * Start the DriverClient service.
- * @throws IOException when unable to start service
- */
- void start() throws IOException;
-
-
- /**
- * Wait for termination of driver client service.
- */
- void awaitTermination() throws InterruptedException;
-
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverRuntimeConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverRuntimeConfigurationProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverRuntimeConfigurationProvider.java
new file mode 100644
index 0000000..4e32464
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverRuntimeConfigurationProvider.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.bridge.client.runtime.LocalDriverRuntimeConfigurationProvider;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+/**
+ * Configuration provider for the runtime.
+ */
+@DefaultImplementation(LocalDriverRuntimeConfigurationProvider.class)
+public interface IDriverRuntimeConfigurationProvider extends IDriverBridgeConfigurationProvider {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java
deleted file mode 100644
index e1f8cb7..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.client;
-
-import org.apache.reef.bridge.client.grpc.DriverServiceClient;
-import org.apache.reef.driver.evaluator.EvaluatorRequest;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.annotations.DefaultImplementation;
-import org.apache.reef.util.Optional;
-
-import java.io.File;
-import java.util.List;
-
-/**
- * Forwards application requests to driver server.
- */
-@DefaultImplementation(DriverServiceClient.class)
-public interface IDriverServiceClient {
-
- /**
- * Initiate shutdown.
- */
- void onShutdown();
-
- /**
- * Initiate shutdown with error.
- * @param ex exception error
- */
- void onShutdown(final Throwable ex);
-
- /**
- * Set alarm.
- * @param alarmId alarm identifier
- * @param timeoutMS timeout in milliseconds
- */
- void onSetAlarm(final String alarmId, final int timeoutMS);
-
- /**
- * Request evaluators.
- * @param evaluatorRequest event
- */
- void onEvaluatorRequest(final EvaluatorRequest evaluatorRequest);
-
- /**
- * Close evaluator.
- * @param evalautorId to close
- */
- void onEvaluatorClose(final String evalautorId);
-
- /**
- * Submit context and/or task.
- * @param evaluatorId to submit against
- * @param contextConfiguration context configuration
- * @param taskConfiguration task configuration
- * @param evaluatorProcess evaluator process
- * @param addFileList to include
- * @param addLibraryList to include
- */
- void onEvaluatorSubmit(
- final String evaluatorId,
- final Optional<Configuration> contextConfiguration,
- final Optional<Configuration> taskConfiguration,
- final Optional<JVMClientProcess> evaluatorProcess,
- final Optional<List<File>> addFileList,
- final Optional<List<File>> addLibraryList);
-
- // Context Operations
-
- /**
- * Close context.
- * @param contextId to close
- */
- void onContextClose(final String contextId);
-
- /**
- * Submit child context.
- * @param contextId to submit against
- * @param contextConfiguration for child context
- */
- void onContextSubmitContext(
- final String contextId,
- final Configuration contextConfiguration);
-
- /**
- * Submit task.
- * @param contextId to submit against
- * @param taskConfiguration for task
- */
- void onContextSubmitTask(
- final String contextId,
- final Configuration taskConfiguration);
-
- /**
- * Send message to context.
- * @param contextId to destination context
- * @param message to send
- */
- void onContextMessage(final String contextId, final byte[] message);
-
- // Task operations
-
- /**
- * Close the task.
- * @param taskId to close
- * @param message optional message to include
- */
- void onTaskClose(final String taskId, final Optional<byte[]> message);
-
- /**
- * Send task a message.
- * @param taskId of destination task
- * @param message to send
- */
- void onTaskMessage(final String taskId, final byte[] message);
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceRuntimeLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceRuntimeLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceRuntimeLauncher.java
new file mode 100644
index 0000000..789fa03
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceRuntimeLauncher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.bridge.proto.ClientProtocol;
+
+/**
+ * Driver service launcher.
+ */
+public interface IDriverServiceRuntimeLauncher {
+
+ void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration);
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java
deleted file mode 100644
index cdcb9b5..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.client;
-
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.evaluator.EvaluatorProcess;
-import org.apache.reef.driver.evaluator.EvaluatorType;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Stub class for Evaluator Process on driver client.
- */
-@Private
-public final class JVMClientProcess implements EvaluatorProcess {
-
- private boolean optionSet = false;
-
- private int megaBytes = 0;
-
- private String configurationFileName = null;
-
- private String standardOut = null;
-
- private String standardErr = null;
-
- private final List<String> optionList = new ArrayList<>();
-
- public JVMClientProcess() {
- }
-
- @Override
- public List<String> getCommandLine() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public EvaluatorType getType() {
- return EvaluatorType.JVM;
- }
-
- @Override
- public JVMClientProcess setMemory(final int mb) {
- this.megaBytes = mb;
- this.optionSet = true;
- return this;
- }
-
- public int getMemory() {
- return this.megaBytes;
- }
-
- @Override
- public boolean isOptionSet() {
- return optionSet;
- }
-
- @Override
- public JVMClientProcess setConfigurationFileName(final String configurationFileName) {
- this.configurationFileName = configurationFileName;
- return this;
- }
-
- public String getConfigurationFileName() {
- return this.configurationFileName;
- }
-
- @Override
- public JVMClientProcess setStandardOut(final String standardOut) {
- this.standardOut = standardOut;
- return this;
- }
-
- public String getStandardOut() {
- return this.standardOut;
- }
-
- @Override
- public JVMClientProcess setStandardErr(final String standardErr) {
- this.standardErr = standardErr;
- return this;
- }
-
- public String getStandardErr() {
- return this.standardErr;
- }
-
- /**
- * Add a JVM option.
- * @param option The full option, e.g. "-XX:+PrintGCDetails", "-Xms500m"
- * @return this
- */
- public JVMClientProcess addOption(final String option) {
- this.optionList.add(option);
- optionSet = true;
- return this;
- }
-
- public List<String> getOptions() {
- return this.optionList;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java
deleted file mode 100644
index a7bf37c..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.client;
-
-import org.apache.reef.bridge.client.grpc.DriverClientGrpcConfiguration;
-import org.apache.reef.bridge.client.grpc.parameters.DriverServicePort;
-import org.apache.reef.runtime.common.REEFLauncher;
-import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
-import org.apache.reef.runtime.common.launch.REEFErrorHandler;
-import org.apache.reef.runtime.common.launch.REEFMessageCodec;
-import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler;
-import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.tang.exceptions.BindException;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.ConfigurationSerializer;
-import org.apache.reef.util.EnvironmentUtils;
-import org.apache.reef.util.ThreadLogger;
-import org.apache.reef.util.logging.LoggingSetup;
-import org.apache.reef.wake.remote.RemoteConfiguration;
-import org.apache.reef.wake.time.Clock;
-
-import javax.inject.Inject;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Driver client launcher.
- */
-public final class JavaDriverClientLauncher {
-
- private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName());
-
- private static final Tang TANG = Tang.Factory.getTang();
-
- private static final Configuration LAUNCHER_STATIC_CONFIG =
- TANG.newConfigurationBuilder()
- .bindNamedParameter(RemoteConfiguration.ManagerName.class, "DRIVER_CLIENT_LAUNCHER")
- .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
- .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
- .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class)
- .build();
-
- static {
- LoggingSetup.setupCommonsLogging();
- }
-
- /**
- * Main configuration object of the REEF component we are launching here.
- */
- private final Configuration envConfig;
-
- /**
- * REEFLauncher is instantiated in the main() method below using
- * Tang configuration file provided as a command line argument.
- * @param configurationPath Path to the serialized Tang configuration file.
- * (The file must be in the local file system).
- * @param configurationSerializer Serializer used to read the configuration file.
- * We currently use Avro to serialize Tang configs.
- */
- @Inject
- private JavaDriverClientLauncher(
- @Parameter(DriverServicePort.class) final Integer driverServicePort,
- @Parameter(ClockConfigurationPath.class) final String configurationPath,
- final ConfigurationSerializer configurationSerializer) {
-
- this.envConfig = Configurations.merge(
- LAUNCHER_STATIC_CONFIG,
- DriverClientGrpcConfiguration.CONF
- .set(DriverClientGrpcConfiguration.DRIVER_SERVICE_PORT, driverServicePort)
- .build(),
- readConfigurationFromDisk(configurationPath, configurationSerializer));
- }
-
- /**
- * Instantiate REEF DriverServiceLauncher. This method is called from REEFLauncher.main().
- * @param clockConfigPath Path to the local file that contains serialized configuration
- * for the driver client.
- * @return An instance of the configured REEFLauncher object.
- */
- private static JavaDriverClientLauncher getLauncher(final String clockConfigPath, final int driverServicePort) {
-
- try {
-
- final Configuration clockArgConfig = Configurations.merge(
- LAUNCHER_STATIC_CONFIG,
- DriverClientGrpcConfiguration.CONF
- .set(DriverClientGrpcConfiguration.DRIVER_SERVICE_PORT, driverServicePort)
- .build(),
- TANG.newConfigurationBuilder()
- .bindNamedParameter(ClockConfigurationPath.class, clockConfigPath)
- .build());
-
- return TANG.newInjector(clockArgConfig).getInstance(JavaDriverClientLauncher.class);
-
- } catch (final BindException ex) {
- throw fatal("Error in parsing the command line", ex);
- } catch (final InjectionException ex) {
- throw fatal("Unable to instantiate REEFLauncher.", ex);
- }
- }
-
- /**
- * Read configuration from a given file and deserialize it
- * into Tang configuration object that can be used for injection.
- * Configuration is currently serialized using Avro.
- * This method also prints full deserialized configuration into log.
- * @param configPath Path to the local file that contains serialized configuration
- * of a REEF component to launch (can be either Driver or Evaluator).
- * @param serializer An object to deserialize the configuration file.
- * @return Tang configuration read and deserialized from a given file.
- */
- private static Configuration readConfigurationFromDisk(
- final String configPath, final ConfigurationSerializer serializer) {
-
- LOG.log(Level.FINER, "Loading configuration file: {0}", configPath);
-
- final File evaluatorConfigFile = new File(configPath);
-
- if (!evaluatorConfigFile.exists()) {
- throw fatal(
- "Configuration file " + configPath + " does not exist. Can be an issue in job submission.",
- new FileNotFoundException(configPath));
- }
-
- if (!evaluatorConfigFile.canRead()) {
- throw fatal(
- "Configuration file " + configPath + " exists, but can't be read.",
- new IOException(configPath));
- }
-
- try {
-
- final Configuration config = serializer.fromFile(evaluatorConfigFile);
- LOG.log(Level.FINEST, "The configuration file loaded: {0}", configPath);
-
- return config;
-
- } catch (final IOException e) {
- throw fatal("Unable to parse the configuration file: " + configPath, e);
- }
- }
-
- /**
- * Launches a REEF client process (Driver or Evaluator).
- * @param args Command-line arguments.
- * Must be a single element containing local path to the configuration file.
- */
- @SuppressWarnings("checkstyle:illegalcatch")
- public static void main(final String[] args) {
-
- LOG.log(Level.INFO, "Entering JavaDriverClientLauncher.main().");
-
- LOG.log(Level.FINE, "JavaDriverClientLauncher started with user name [{0}]", System.getProperty("user.name"));
- LOG.log(Level.FINE, "JavaDriverClientLauncher started. Assertions are {0} in this process.",
- EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED");
-
- if (args.length != 2) {
- final String message = "JavaDriverClientLauncher have two and only two arguments to specify the runtime clock " +
- "configuration path and driver service port";
-
- throw fatal(message, new IllegalArgumentException(message));
- }
-
- final JavaDriverClientLauncher launcher = getLauncher(args[0], Integer.parseInt(args[1]));
-
- Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.envConfig));
- final Injector injector = Tang.Factory.getTang().newInjector(launcher.envConfig);
- try (final Clock reef = injector.getInstance(Clock.class)) {
- reef.run();
- } catch (final Throwable ex) {
- throw fatal("Unable to configure and start Clock.", ex);
- }
-
- ThreadLogger.logThreads(LOG, Level.FINEST, "Threads running after Clock.close():");
-
- LOG.log(Level.INFO, "Exiting REEFLauncher.main()");
-
- System.exit(0); // TODO[REEF-1715]: Should be able to exit cleanly at the end of main()
- }
-
- /**
- * Wrap an exception into RuntimeException with a given message,
- * and write the same message and exception to the log.
- * @param msg an error message to log and pass into the RuntimeException.
- * @param t A Throwable exception to log and wrap.
- * @return a new Runtime exception wrapping a Throwable.
- */
- private static RuntimeException fatal(final String msg, final Throwable t) {
- LOG.log(Level.SEVERE, msg, t);
- return new RuntimeException(msg, t);
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/WindowsRuntimePathProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/WindowsRuntimePathProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/WindowsRuntimePathProvider.java
new file mode 100644
index 0000000..9b6d02c
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/WindowsRuntimePathProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.runtime.common.files.RuntimePathProvider;
+
+import javax.inject.Inject;
+/**
+ * Supplies the java binary's path for HDInsight.
+ */
+public final class WindowsRuntimePathProvider implements RuntimePathProvider {
+
+ @Inject
+ public WindowsRuntimePathProvider() {
+ }
+
+ @Override
+ public String getPath() {
+ return "java";
+ }
+
+ @Override
+ public String toString() {
+ return getPath();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java
deleted file mode 100644
index 54645a0..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.client.events;
-
-import org.apache.reef.bridge.client.IDriverServiceClient;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.util.Optional;
-
-/**
- * Active context bridge.
- */
-public final class ActiveContextBridge implements ActiveContext {
-
- private final IDriverServiceClient driverServiceClient;
-
- private final String contextId;
-
- private final Optional<String> parentId;
-
- private final String evaluatorId;
-
- private final EvaluatorDescriptor evaluatorDescriptor;
-
- public ActiveContextBridge(
- final IDriverServiceClient driverServiceClient,
- final String contextId,
- final Optional<String> parentId,
- final String evaluatorId,
- final EvaluatorDescriptor evaluatorDescriptor) {
- this.driverServiceClient = driverServiceClient;
- this.contextId = contextId;
- this.parentId = parentId;
- this.evaluatorId = evaluatorId;
- this.evaluatorDescriptor = evaluatorDescriptor;
- }
-
- @Override
- public void close() {
- this.driverServiceClient.onContextClose(this.contextId);
- }
-
- @Override
- public void submitTask(final Configuration taskConf) {
- this.driverServiceClient.onContextSubmitTask(this.contextId, taskConf);
- }
-
- @Override
- public void submitContext(final Configuration contextConfiguration) {
- this.driverServiceClient.onContextSubmitContext(this.contextId, contextConfiguration);
- }
-
- @Override
- public void submitContextAndService(
- final Configuration contextConfiguration,
- final Configuration serviceConfiguration) {
- throw new UnsupportedOperationException("Service not supported");
- }
-
- @Override
- public void sendMessage(final byte[] message) {
- this.driverServiceClient.onContextMessage(this.contextId, message);
- }
-
- @Override
- public String getEvaluatorId() {
- return this.evaluatorId;
- }
-
- @Override
- public Optional<String> getParentId() {
- return this.parentId;
- }
-
- @Override
- public EvaluatorDescriptor getEvaluatorDescriptor() {
- return this.evaluatorDescriptor;
- }
-
- @Override
- public String getId() {
- return this.contextId;
- }
-}