You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2018/05/01 20:36:34 UTC
[2/6] reef git commit: [REEF-2003] Revise Driver Service to allow
static configuration.
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java
new file mode 100644
index 0000000..ad7eaa8
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java
@@ -0,0 +1,919 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.driver.service.grpc;
+
+import com.google.protobuf.ByteString;
+import io.grpc.*;
+import io.grpc.stub.StreamObserver;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
+import org.apache.reef.bridge.driver.service.DriverClientException;
+import org.apache.reef.bridge.driver.service.IDriverService;
+import org.apache.reef.bridge.service.parameters.DriverClientCommand;
+import org.apache.reef.bridge.proto.*;
+import org.apache.reef.bridge.proto.Void;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.*;
+import org.apache.reef.driver.restart.DriverRestartCompleted;
+import org.apache.reef.driver.restart.DriverRestarted;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.runtime.common.driver.context.EvaluatorContext;
+import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorImpl;
+import org.apache.reef.runtime.common.driver.idle.IdleMessage;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.OSUtils;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * GRPC DriverBridgeService that interacts with higher-level languages.
+ */
+public final class GRPCDriverService implements IDriverService {
+ private static final Logger LOG = Logger.getLogger(GRPCDriverService.class.getName());
+
+ private static final Void VOID = Void.newBuilder().build();
+
+ private Server server;
+
+ private Process driverProcess;
+
+ private DriverClientGrpc.DriverClientFutureStub clientStub;
+
+ private final Clock clock;
+
+ private final ConfigurationSerializer configurationSerializer;
+
+ private final EvaluatorRequestor evaluatorRequestor;
+
+ private final JVMProcessFactory jvmProcessFactory;
+
+ private final CLRProcessFactory clrProcessFactory;
+
+ private final TcpPortProvider tcpPortProvider;
+
+ private final String driverClientCommand;
+
+ private final Map<String, AllocatedEvaluator> allocatedEvaluatorMap = new HashMap<>();
+
+ private final Map<String, ActiveContext> activeContextMap = new HashMap<>();
+
+ private final Map<String, RunningTask> runningTaskMap = new HashMap<>();
+
+ private boolean stopped = false;
+
+ @Inject
+ private GRPCDriverService(
+ final Clock clock,
+ final EvaluatorRequestor evaluatorRequestor,
+ final ConfigurationSerializer configurationSerializer,
+ final JVMProcessFactory jvmProcessFactory,
+ final CLRProcessFactory clrProcessFactory,
+ final TcpPortProvider tcpPortProvider,
+ @Parameter(DriverClientCommand.class) final String driverClientCommand) {
+ this.clock = clock;
+ this.configurationSerializer = configurationSerializer;
+ this.jvmProcessFactory = jvmProcessFactory;
+ this.clrProcessFactory = clrProcessFactory;
+ this.evaluatorRequestor = evaluatorRequestor;
+ this.driverClientCommand = driverClientCommand;
+ this.tcpPortProvider = tcpPortProvider;
+ }
+
+ private void start() throws IOException, InterruptedException {
+ for (final Integer port : this.tcpPortProvider) {
+ try {
+ this.server = ServerBuilder.forPort(port)
+ .addService(new DriverBridgeServiceImpl())
+ .build()
+ .start();
+ LOG.info("Server started, listening on " + port);
+ break;
+ } catch (IOException e) {
+ LOG.log(Level.WARNING, "Unable to bind to port [{0}]", port);
+ }
+ }
+ if (this.server == null || this.server.isTerminated()) {
+ throw new IOException("Unable to start gRPC server");
+ } else {
+ final String cmd = this.driverClientCommand + " " + this.server.getPort();
+ final String cmdOs = OSUtils.isWindows() ? "cmd.exe /c \"" + cmd + "\"" : cmd;
+ final String cmdStd = cmdOs + " 1> driverclient.stdout 2> driverclient.stderr";
+ this.driverProcess = Runtime.getRuntime().exec(cmdStd);
+ synchronized (this) {
+ // wait for driver client process to register
+ while (this.clientStub == null && driverProcessIsAlive()) {
+ this.wait(1000); // a second
+ }
+ }
+ if (driverProcessIsAlive()) {
+ Thread closeChildThread = new Thread() {
+ public void run() {
+ synchronized (GRPCDriverService.this) {
+ if (GRPCDriverService.this.driverProcess != null) {
+ GRPCDriverService.this.driverProcess.destroy();
+ GRPCDriverService.this.driverProcess = null;
+ }
+ }
+ }
+ };
+ Runtime.getRuntime().addShutdownHook(closeChildThread);
+ }
+ }
+ }
+
+ private void stop() {
+ stop(null);
+ }
+
+ private void stop(final Throwable t) {
+ if (!stopped) {
+ try {
+ if (t != null) {
+ clock.stop(t);
+ } else {
+ clock.stop();
+ }
+ if (server != null) {
+ LOG.log(Level.INFO, "Shutdown gRPC");
+ this.server.shutdown();
+ this.server = null;
+ }
+ if (this.driverProcess != null) {
+ LOG.log(Level.INFO, "Shutdown driver process");
+ this.driverProcess.destroy();
+ this.driverProcess = null;
+ }
+ } finally {
+ stopped = true;
+ }
+ }
+ }
+
+ /**
+ * Await termination on the main thread since the grpc library uses daemon threads.
+ */
+ private void blockUntilShutdown() throws InterruptedException {
+ if (server != null) {
+ server.awaitTermination();
+ }
+ }
+
+ /**
+ * Determines if the driver process is still alive by
+ * testing for its exit value, which throws {@link IllegalThreadStateException}
+ * if process is still running.
+ * @return true if driver process is alive, false otherwise
+ */
+ private boolean driverProcessIsAlive() {
+ if (this.driverProcess != null) {
+ try {
+ this.driverProcess.exitValue();
+ } catch (IllegalThreadStateException e) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private EvaluatorDescriptorInfo toEvaluatorDescriptorInfo(final EvaluatorDescriptor descriptor) {
+ if (descriptor == null) {
+ return null;
+ } else {
+ return EvaluatorDescriptorInfo.newBuilder()
+ .setCores(descriptor.getNumberOfCores())
+ .setMemory(descriptor.getMemory())
+ .setRuntimeName(descriptor.getRuntimeName())
+ .build();
+ }
+ }
+
+ @Override
+ public IdleMessage getIdleStatus() {
+ final String componentName = "Java Bridge DriverService";
+ if (this.clientStub != null) {
+ try {
+ final IdleStatus idleStatus = this.clientStub.idlenessCheckHandler(VOID).get();
+ LOG.log(Level.INFO, "is idle: " + idleStatus.getIsIdle());
+ return new IdleMessage(
+ componentName,
+ idleStatus.getReason(),
+ idleStatus.getIsIdle());
+ } catch (ExecutionException | InterruptedException e) {
+ stop(e);
+ }
+ }
+ return new IdleMessage(
+ componentName,
+ "stub not initialized",
+ true);
+ }
+
+ @Override
+ public void startHandler(final StartTime startTime) {
+ try {
+ start();
+ synchronized (this) {
+ if (this.clientStub != null) {
+ this.clientStub.startHandler(
+ StartTimeInfo.newBuilder().setStartTime(startTime.getTimestamp()).build());
+ } else {
+ stop(new IllegalStateException("Unable to start driver client"));
+ }
+ }
+ } catch (IOException | InterruptedException e) {
+ stop(e);
+ }
+ }
+
+ @Override
+ public void stopHandler(final StopTime stopTime) {
+ synchronized (this) {
+ try {
+ if (clientStub != null) {
+ this.clientStub.stopHandler(
+ StopTimeInfo.newBuilder().setStopTime(stopTime.getTimestamp()).build());
+ }
+ } finally {
+ stop();
+ }
+ }
+
+ }
+
+ @Override
+ public void allocatedEvaluatorHandler(final AllocatedEvaluator eval) {
+ synchronized (this) {
+ this.allocatedEvaluatorMap.put(eval.getId(), eval);
+ this.clientStub.allocatedEvaluatorHandler(
+ EvaluatorInfo.newBuilder()
+ .setEvaluatorId(eval.getId())
+ .setDescriptorInfo(toEvaluatorDescriptorInfo(eval.getEvaluatorDescriptor()))
+ .build());
+ }
+ }
+
+ @Override
+ public void completedEvaluatorHandler(final CompletedEvaluator eval) {
+ synchronized (this) {
+ this.allocatedEvaluatorMap.remove(eval.getId());
+ this.clientStub.completedEvaluatorHandler(
+ EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build());
+ }
+ }
+
+ @Override
+ public void failedEvaluatorHandler(final FailedEvaluator eval) {
+ synchronized (this) {
+ this.allocatedEvaluatorMap.remove(eval.getId());
+ this.clientStub.failedEvaluatorHandler(
+ EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build());
+ }
+ }
+
+ @Override
+ public void activeContextHandler(final ActiveContext context) {
+ synchronized (this) {
+ this.activeContextMap.put(context.getId(), context);
+ this.clientStub.activeContextHandler(
+ ContextInfo.newBuilder()
+ .setContextId(context.getId())
+ .setEvaluatorId(context.getEvaluatorId())
+ .setParentId(
+ context.getParentId().isPresent() ?
+ context.getParentId().get() : null)
+ .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
+ context.getEvaluatorDescriptor()))
+ .build());
+ }
+ }
+
+ @Override
+ public void closedContextHandler(final ClosedContext context) {
+ synchronized (this) {
+ this.activeContextMap.remove(context.getId());
+ this.clientStub.closedContextHandler(
+ ContextInfo.newBuilder()
+ .setContextId(context.getId())
+ .setEvaluatorId(context.getEvaluatorId())
+ .setParentId(context.getParentContext().getId())
+ .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
+ context.getEvaluatorDescriptor()))
+ .build());
+ }
+ }
+
+ @Override
+ public void failedContextHandler(final FailedContext context) {
+ synchronized (this) {
+ this.activeContextMap.remove(context.getId());
+ this.clientStub.closedContextHandler(
+ ContextInfo.newBuilder()
+ .setContextId(context.getId())
+ .setEvaluatorId(context.getEvaluatorId())
+ .setParentId(
+ context.getParentContext().isPresent() ?
+ context.getParentContext().get().getId() : null)
+ .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
+ context.getEvaluatorDescriptor()))
+ .build());
+ }
+ }
+
+ @Override
+ public void contextMessageHandler(final ContextMessage message) {
+ synchronized (this) {
+ this.clientStub.contextMessageHandler(
+ ContextMessageInfo.newBuilder()
+ .setContextId(message.getId())
+ .setMessageSourceId(message.getMessageSourceID())
+ .setSequenceNumber(message.getSequenceNumber())
+ .setPayload(ByteString.copyFrom(message.get()))
+ .build());
+ }
+ }
+
+ @Override
+ public void runningTaskHandler(final RunningTask task) {
+ synchronized (this) {
+ final ActiveContext context = task.getActiveContext();
+ if (!this.activeContextMap.containsKey(context.getId())) {
+ this.activeContextMap.put(context.getId(), context);
+ }
+ this.runningTaskMap.put(task.getId(), task);
+ this.clientStub.runningTaskHandler(
+ TaskInfo.newBuilder()
+ .setTaskId(task.getId())
+ .setContext(ContextInfo.newBuilder()
+ .setContextId(context.getId())
+ .setEvaluatorId(context.getEvaluatorId())
+ .setParentId(context.getParentId().isPresent() ? context.getParentId().get() : "")
+ .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
+ task.getActiveContext().getEvaluatorDescriptor()))
+ .build())
+ .build());
+ }
+ }
+
+ @Override
+ public void failedTaskHandler(final FailedTask task) {
+ synchronized (this) {
+ if (task.getActiveContext().isPresent() &&
+ !this.activeContextMap.containsKey(task.getActiveContext().get().getId())) {
+ this.activeContextMap.put(task.getActiveContext().get().getId(), task.getActiveContext().get());
+ }
+ this.runningTaskMap.remove(task.getId());
+ this.clientStub.failedTaskHandler(
+ TaskInfo.newBuilder()
+ .setTaskId(task.getId())
+ .setContext(task.getActiveContext().isPresent() ?
+ ContextInfo.newBuilder()
+ .setContextId(task.getActiveContext().get().getId())
+ .setEvaluatorId(task.getActiveContext().get().getEvaluatorId())
+ .setParentId(task.getActiveContext().get().getParentId().isPresent() ?
+ task.getActiveContext().get().getParentId().get() : "")
+ .build() : null)
+ .build());
+ }
+ }
+
+ @Override
+ public void completedTaskHandler(final CompletedTask task) {
+ synchronized (this) {
+ if (!this.activeContextMap.containsKey(task.getActiveContext().getId())) {
+ this.activeContextMap.put(task.getActiveContext().getId(), task.getActiveContext());
+ }
+ this.runningTaskMap.remove(task.getId());
+ this.clientStub.completedTaskHandler(
+ TaskInfo.newBuilder()
+ .setTaskId(task.getId())
+ .setContext(ContextInfo.newBuilder()
+ .setContextId(task.getActiveContext().getId())
+ .setEvaluatorId(task.getActiveContext().getEvaluatorId())
+ .setParentId(task.getActiveContext().getParentId().isPresent() ?
+ task.getActiveContext().getParentId().get() : "")
+ .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
+ task.getActiveContext().getEvaluatorDescriptor()))
+ .build())
+ .build());
+ }
+ }
+
+ @Override
+ public void suspendedTaskHandler(final SuspendedTask task) {
+ synchronized (this) {
+ if (!this.activeContextMap.containsKey(task.getActiveContext().getId())) {
+ this.activeContextMap.put(task.getActiveContext().getId(), task.getActiveContext());
+ }
+ this.runningTaskMap.remove(task.getId());
+ this.clientStub.suspendedTaskHandler(
+ TaskInfo.newBuilder()
+ .setTaskId(task.getId())
+ .setContext(ContextInfo.newBuilder()
+ .setContextId(task.getActiveContext().getId())
+ .setEvaluatorId(task.getActiveContext().getEvaluatorId())
+ .setParentId(task.getActiveContext().getParentId().isPresent() ?
+ task.getActiveContext().getParentId().get() : "")
+ .build())
+ .build());
+ }
+ }
+
+ @Override
+ public void taskMessageHandler(final TaskMessage message) {
+ synchronized (this) {
+ this.clientStub.taskMessageHandler(
+ TaskMessageInfo.newBuilder()
+ .setTaskId(message.getId())
+ .setContextId(message.getContextId())
+ .setMessageSourceId(message.getMessageSourceID())
+ .setSequenceNumber(message.getSequenceNumber())
+ .setPayload(ByteString.copyFrom(message.get()))
+ .build());
+ }
+ }
+
+ @Override
+ public void clientMessageHandler(final byte[] message) {
+ synchronized (this) {
+ this.clientStub.clientMessageHandler(
+ ClientMessageInfo.newBuilder()
+ .setPayload(ByteString.copyFrom(message))
+ .build());
+ }
+ }
+
+ @Override
+ public void clientCloseHandler() {
+ synchronized (this) {
+ this.clientStub.clientCloseHandler(VOID);
+ }
+ }
+
+ @Override
+ public void clientCloseWithMessageHandler(final byte[] message) {
+ synchronized (this) {
+ this.clientStub.clientCloseWithMessageHandler(
+ ClientMessageInfo.newBuilder()
+ .setPayload(ByteString.copyFrom(message))
+ .build());
+ }
+ }
+
+ @Override
+ public void driverRestarted(final DriverRestarted restart) {
+ try {
+ start();
+ synchronized (this) {
+ if (this.clientStub != null) {
+ this.clientStub.driverRestartHandler(DriverRestartInfo.newBuilder()
+ .setResubmissionAttempts(restart.getResubmissionAttempts())
+ .setStartTime(StartTimeInfo.newBuilder()
+ .setStartTime(restart.getStartTime().getTimestamp()).build())
+ .addAllExpectedEvaluatorIds(restart.getExpectedEvaluatorIds())
+ .build());
+ } else {
+ stop(new DriverClientException("Failed to restart driver client"));
+ }
+ }
+ } catch (InterruptedException | IOException e) {
+ stop(e);
+ }
+ }
+
+ @Override
+ public void restartRunningTask(final RunningTask task) {
+ synchronized (this) {
+ final ActiveContext context = task.getActiveContext();
+ if (!this.activeContextMap.containsKey(context.getId())) {
+ this.activeContextMap.put(context.getId(), context);
+ }
+ this.runningTaskMap.put(task.getId(), task);
+ this.clientStub.driverRestartRunningTaskHandler(
+ TaskInfo.newBuilder()
+ .setTaskId(task.getId())
+ .setContext(ContextInfo.newBuilder()
+ .setContextId(context.getId())
+ .setEvaluatorId(context.getEvaluatorId())
+ .setParentId(context.getParentId().isPresent() ? context.getParentId().get() : "")
+ .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(context.getEvaluatorDescriptor()))
+ .build())
+ .build());
+ }
+ }
+
+ @Override
+ public void restartActiveContext(final ActiveContext context) {
+ synchronized (this) {
+ this.activeContextMap.put(context.getId(), context);
+ this.clientStub.driverRestartActiveContextHandler(
+ ContextInfo.newBuilder()
+ .setContextId(context.getId())
+ .setEvaluatorId(context.getEvaluatorId())
+ .setParentId(
+ context.getParentId().isPresent() ?
+ context.getParentId().get() : null)
+ .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
+ context.getEvaluatorDescriptor()))
+ .build());
+ }
+ }
+
+ @Override
+ public void driverRestartCompleted(final DriverRestartCompleted restartCompleted) {
+ synchronized (this) {
+ this.clientStub.driverRestartCompletedHandler(DriverRestartCompletedInfo.newBuilder()
+ .setCompletionTime(StopTimeInfo.newBuilder()
+ .setStopTime(restartCompleted.getCompletedTime().getTimestamp()).build())
+ .setIsTimedOut(restartCompleted.isTimedOut())
+ .build());
+ }
+ }
+
+ @Override
+ public void restartFailedEvalautor(final FailedEvaluator evaluator) {
+ synchronized (this) {
+ this.clientStub.driverRestartFailedEvaluatorHandler(EvaluatorInfo.newBuilder()
+ .setEvaluatorId(evaluator.getId())
+ .setFailure(EvaluatorInfo.FailureInfo.newBuilder()
+ .setMessage(evaluator.getEvaluatorException() != null ?
+ evaluator.getEvaluatorException().getMessage() : "unknown failure during restart")
+ .build())
+ .build());
+ }
+ }
+
+ private final class DriverBridgeServiceImpl
+ extends DriverServiceGrpc.DriverServiceImplBase {
+
+ @Override
+ public void registerDriverClient(
+ final DriverClientRegistration request,
+ final StreamObserver<Void> responseObserver) {
+ try {
+ final ManagedChannel channel = ManagedChannelBuilder
+ .forAddress(request.getHost(), request.getPort())
+ .usePlaintext(true)
+ .build();
+ synchronized (GRPCDriverService.this) {
+ GRPCDriverService.this.clientStub = DriverClientGrpc.newFutureStub(channel);
+ GRPCDriverService.this.notifyAll();
+ }
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void requestResources(
+ final ResourceRequest request,
+ final StreamObserver<Void> responseObserver) {
+ try {
+ synchronized (GRPCDriverService.this) {
+ EvaluatorRequest.Builder requestBuilder = GRPCDriverService.this.evaluatorRequestor.newRequest();
+ requestBuilder.setNumber(request.getResourceCount());
+ requestBuilder.setNumberOfCores(request.getCores());
+ requestBuilder.setMemory(request.getMemorySize());
+ requestBuilder.setRelaxLocality(request.getRelaxLocality());
+ requestBuilder.setRuntimeName(request.getRuntimeName());
+ if (request.getNodeNameListCount() > 0) {
+ requestBuilder.addNodeNames(request.getNodeNameListList());
+ }
+ if (request.getRackNameListCount() > 0) {
+ for (final String rackName : request.getRackNameListList()) {
+ requestBuilder.addRackName(rackName);
+ }
+ }
+ GRPCDriverService.this.evaluatorRequestor.submit(requestBuilder.build());
+ }
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void shutdown(
+ final ShutdownRequest request,
+ final StreamObserver<Void> responseObserver) {
+ try {
+ synchronized (GRPCDriverService.this) {
+ if (request.getException() != null) {
+ GRPCDriverService.this.clock.stop(
+ new DriverClientException(request.getException().getMessage()));
+ } else {
+ GRPCDriverService.this.clock.stop();
+ }
+ }
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void setAlarm(
+ final AlarmRequest request,
+ final StreamObserver<Void> responseObserver) {
+ try {
+ synchronized (GRPCDriverService.this) {
+ GRPCDriverService.this.clock.scheduleAlarm(request.getTimeoutMs(), new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm value) {
+ synchronized (GRPCDriverService.this) {
+ GRPCDriverService.this.clientStub.alarmTrigger(
+ AlarmTriggerInfo.newBuilder().setAlarmId(request.getAlarmId()).build());
+ }
+ }
+ });
+ }
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void allocatedEvaluatorOp(
+ final AllocatedEvaluatorRequest request,
+ final StreamObserver<Void> responseObserver) {
+ try {
+ if (request.getEvaluatorConfiguration() == null) {
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("Evaluator configuration required")
+ .asRuntimeException());
+ } else if (request.getContextConfiguration() == null && request.getTaskConfiguration() == null) {
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("Context and/or Task configuration required")
+ .asRuntimeException());
+ } else {
+ synchronized (GRPCDriverService.this) {
+ if (!GRPCDriverService.this.allocatedEvaluatorMap.containsKey(request.getEvaluatorId())) {
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("Unknown allocated evaluator " + request.getEvaluatorId())
+ .asRuntimeException());
+ }
+ final AllocatedEvaluator evaluator =
+ GRPCDriverService.this.allocatedEvaluatorMap.get(request.getEvaluatorId());
+ if (request.getCloseEvaluator()) {
+ evaluator.close();
+ } else {
+ if (request.getAddFilesCount() > 0) {
+ for (final String file : request.getAddFilesList()) {
+ evaluator.addFile(new File(file));
+ }
+ }
+ if (request.getAddLibrariesCount() > 0) {
+ for (final String library : request.getAddLibrariesList()) {
+ evaluator.addLibrary(new File(library));
+ }
+ }
+ if (request.getSetProcess() != null) {
+ final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest =
+ request.getSetProcess();
+ switch (evaluator.getEvaluatorDescriptor().getProcess().getType()) {
+ case JVM:
+ setJVMProcess(evaluator, processRequest);
+ break;
+ case CLR:
+ setCLRProcess(evaluator, processRequest);
+ break;
+ default:
+ throw new RuntimeException("Unknown evaluator process type");
+ }
+ }
+ if (StringUtils.isEmpty(request.getEvaluatorConfiguration())) {
+ // Assume that we are running Java driver client, but this assumption could be a bug so log a warning
+ LOG.log(Level.WARNING, "No evaluator configuration detected. Assuming a Java driver client.");
+ if (request.getContextConfiguration() != null && request.getTaskConfiguration() != null) {
+ // submit context and task
+ Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set");
+ Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set");
+ try {
+ evaluator.submitContextAndTask(
+ configurationSerializer.fromString(request.getContextConfiguration()),
+ configurationSerializer.fromString(request.getTaskConfiguration()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else if (request.getContextConfiguration() != null) {
+ // submit context
+ Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set");
+ try {
+ evaluator.submitContext(configurationSerializer.fromString(request.getContextConfiguration()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else if (request.getTaskConfiguration() != null) {
+ // submit task
+ Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set");
+ try {
+ evaluator.submitTask(configurationSerializer.fromString(request.getTaskConfiguration()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new RuntimeException("Missing check for required evaluator configurations");
+ }
+ } else {
+ if (request.getContextConfiguration() != null && request.getTaskConfiguration() != null) {
+ // submit context and task
+ Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set");
+ Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set");
+ Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set");
+ ((AllocatedEvaluatorImpl) evaluator).submitContextAndTask(
+ request.getEvaluatorConfiguration(),
+ request.getContextConfiguration(),
+ request.getTaskConfiguration());
+ } else if (request.getContextConfiguration() != null) {
+ // submit context
+ Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set");
+ Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set");
+ ((AllocatedEvaluatorImpl) evaluator).submitContext(
+ request.getEvaluatorConfiguration(),
+ request.getContextConfiguration());
+ } else if (request.getTaskConfiguration() != null) {
+ // submit task
+ Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set");
+ Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set");
+ ((AllocatedEvaluatorImpl) evaluator).submitTask(
+ request.getEvaluatorConfiguration(),
+ request.getTaskConfiguration());
+ } else {
+ throw new RuntimeException("Missing check for required evaluator configurations");
+ }
+ }
+ }
+ }
+ }
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void activeContextOp(
+ final ActiveContextRequest request,
+ final StreamObserver<Void> responseObserver) {
+ synchronized (GRPCDriverService.this) {
+ if (!GRPCDriverService.this.activeContextMap.containsKey(request.getContextId())) {
+ LOG.log(Level.SEVERE, "Context does not exist with id " + request.getContextId());
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("Context does not exist with id " + request.getContextId())
+ .asRuntimeException());
+ } else {
+ final ActiveContext context = GRPCDriverService.this.activeContextMap.get(request.getContextId());
+ if (request.getOperationCase() == ActiveContextRequest.OperationCase.CLOSE_CONTEXT) {
+ if (request.getCloseContext()) {
+ try {
+ LOG.log(Level.INFO, "closing context " + context.getId());
+ context.close();
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ } else {
+ LOG.log(Level.SEVERE, "Close context operation not set to true");
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("Close context operation not set to true")
+ .asRuntimeException());
+ }
+ } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.MESSAGE) {
+ if (request.getMessage() != null) {
+ try {
+ LOG.log(Level.INFO, "send message to context " + context.getId());
+ context.sendMessage(request.getMessage().toByteArray());
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ } else {
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("Empty message on operation send message").asRuntimeException());
+ }
+ } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.NEW_CONTEXT_REQUEST) {
+ try {
+ LOG.log(Level.INFO, "submitting child context to context " + context.getId());
+ ((EvaluatorContext) context).submitContext(request.getNewContextRequest());
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.NEW_TASK_REQUEST) {
+ try {
+ LOG.log(Level.INFO, "submitting task to context " + context.getId());
+ ((EvaluatorContext) context).submitTask(request.getNewTaskRequest());
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void runningTaskOp(
+ final RunningTaskRequest request,
+ final StreamObserver<Void> responseObserver) {
+ synchronized (GRPCDriverService.this) {
+ if (!GRPCDriverService.this.runningTaskMap.containsKey(request.getTaskId())) {
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("Task does not exist with id " + request.getTaskId()).asRuntimeException());
+ }
+ try {
+ final RunningTask task = GRPCDriverService.this.runningTaskMap.get(request.getTaskId());
+ if (request.getCloseTask()) {
+ if (request.getMessage() != null) {
+ task.close(request.getMessage().toByteArray());
+ } else {
+ task.close();
+ }
+ } else if (request.getMessage() != null) {
+ task.send(request.getMessage().toByteArray());
+ }
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ }
+ }
+
+ private void setCLRProcess(
+ final AllocatedEvaluator evaluator,
+ final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest) {
+ final CLRProcess process = GRPCDriverService.this.clrProcessFactory.newEvaluatorProcess();
+ if (processRequest.getMemoryMb() > 0) {
+ process.setMemory(processRequest.getMemoryMb());
+ }
+ if (processRequest.getConfigurationFileName() != null) {
+ process.setConfigurationFileName(processRequest.getConfigurationFileName());
+ }
+ if (processRequest.getStandardOut() != null) {
+ process.setStandardOut(processRequest.getStandardOut());
+ }
+ if (processRequest.getStandardErr() != null) {
+ process.setStandardErr(processRequest.getStandardErr());
+ }
+ evaluator.setProcess(process);
+ }
+
+ private void setJVMProcess(
+ final AllocatedEvaluator evaluator,
+ final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest) {
+ final JVMProcess process = GRPCDriverService.this.jvmProcessFactory.newEvaluatorProcess();
+ if (processRequest.getMemoryMb() > 0) {
+ process.setMemory(processRequest.getMemoryMb());
+ }
+ if (processRequest.getConfigurationFileName() != null) {
+ process.setConfigurationFileName(processRequest.getConfigurationFileName());
+ }
+ if (processRequest.getStandardOut() != null) {
+ process.setStandardOut(processRequest.getStandardOut());
+ }
+ if (processRequest.getStandardErr() != null) {
+ process.setStandardErr(processRequest.getStandardErr());
+ }
+ if (processRequest.getOptionsCount() > 0) {
+ for (final String option : processRequest.getOptionsList()) {
+ process.addOption(option);
+ }
+ }
+ evaluator.setProcess(process);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java
new file mode 100644
index 0000000..f0334c5
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.service.grpc;
+
+import org.apache.reef.bridge.driver.service.DriverServiceConfiguration;
+import org.apache.reef.bridge.driver.service.DriverServiceConfigurationProviderBase;
+import org.apache.reef.bridge.proto.ClientProtocol;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+
+import javax.inject.Inject;
+import java.util.logging.Logger;
+
+/**
+ * GRPC driver service configuration provider.
+ */
+public final class GRPCDriverServiceConfigurationProvider extends DriverServiceConfigurationProviderBase {
+
+ private static final Logger LOG = Logger.getLogger(GRPCDriverServiceConfigurationProvider.class.getName());
+
+ @Inject
+ private GRPCDriverServiceConfigurationProvider() {
+ }
+
+ @Override
+ public Configuration getConfiguration(final ClientProtocol.DriverClientConfiguration driverConfiguration) {
+ Configuration driverServiceConfiguration = DriverServiceConfiguration.CONF
+ .set(DriverServiceConfiguration.DRIVER_SERVICE_IMPL, GRPCDriverService.class)
+ .set(DriverServiceConfiguration.DRIVER_CLIENT_COMMAND, driverConfiguration.getDriverClientLaunchCommand())
+ .build();
+ return Configurations.merge(
+ driverServiceConfiguration,
+ getDriverConfiguration(driverConfiguration),
+ getTcpPortRangeConfiguration(driverConfiguration));
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/package-info.java
new file mode 100644
index 0000000..a94328d
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * GRPC implementation for driver bridge service.
+ */
+package org.apache.reef.bridge.service.grpc;
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/package-info.java
new file mode 100644
index 0000000..9b58cb0
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The Java-side of the CLR/Java bridge interop via gRPC/Protocol Buffers.
+ */
+package org.apache.reef.bridge.driver.service;
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/DriverClientCommand.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/DriverClientCommand.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/DriverClientCommand.java
new file mode 100644
index 0000000..255f60d
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/DriverClientCommand.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.service.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * What command to use when starting bridge process.
+ */
+@NamedParameter(doc = "The command to launch bridge driver process",
+ short_name = "command")
+public final class DriverClientCommand implements Name<String> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/package-info.java
new file mode 100644
index 0000000..6a3b956
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Driver bridge service parameters.
+ */
+package org.apache.reef.bridge.service.parameters;
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java
deleted file mode 100644
index dac1200..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.examples;
-
-import org.apache.reef.runtime.common.files.RuntimePathProvider;
-
-import javax.inject.Inject;
-/**
- * Supplies the java binary's path for HDInsight.
- */
-public final class WindowsRuntimePathProvider implements RuntimePathProvider {
-
- @Inject
- public WindowsRuntimePathProvider() {
- }
-
- @Override
- public String getPath() {
- return "java";
- }
-
- @Override
- public String toString() {
- return getPath();
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java
index 020a0eb..79b2ee0 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java
@@ -20,6 +20,8 @@ package org.apache.reef.bridge.examples.hello;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.FailedTask;
import org.apache.reef.driver.task.TaskConfiguration;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.annotations.Unit;
@@ -80,4 +82,26 @@ public final class HelloDriver {
allocatedEvaluator.submitTask(taskConfiguration);
}
}
+
+ /**
+ * bla bla.
+ */
+ public final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+ @Override
+ public void onNext(final CompletedTask value) {
+ LOG.log(Level.INFO, "Completed task {0}", value.getId());
+ value.getActiveContext().close();
+ }
+ }
+
+ /**
+ * bla bla.
+ */
+ public final class FailedTaskHandler implements EventHandler<FailedTask> {
+ @Override
+ public void onNext(final FailedTask value) {
+ LOG.log(Level.INFO, "Failed task {0}", value.getId());
+ value.getActiveContext().get().close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java
index 928828c..e80cf1e 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java
@@ -18,10 +18,9 @@
*/
package org.apache.reef.bridge.examples.hello;
-import org.apache.reef.bridge.client.DriverClientConfiguration;
+import org.apache.reef.bridge.driver.client.DriverClientConfiguration;
import org.apache.reef.bridge.proto.ClientProtocol;
-import org.apache.reef.bridge.service.DriverServiceLauncher;
-import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.bridge.client.DriverServiceLauncher;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.util.EnvironmentUtils;
@@ -43,6 +42,8 @@ public final class HelloREEF {
DriverClientConfiguration.CONF
.set(DriverClientConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class)
.set(DriverClientConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class)
+ .set(DriverClientConfiguration.ON_TASK_COMPLETED, HelloDriver.CompletedTaskHandler.class)
+ .set(DriverClientConfiguration.ON_TASK_FAILED, HelloDriver.FailedTaskHandler.class)
.build();
/**
@@ -58,15 +59,10 @@ public final class HelloREEF {
builder.setLocalRuntime(ClientProtocol.LocalRuntimeParameters.newBuilder()
.setMaxNumberOfEvaluators(1)
.build());
- builder.addHandler(ClientProtocol.DriverClientConfiguration.Handlers.START);
- builder.addHandler(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_ALLOCATED);
builder.addGlobalLibraries(EnvironmentUtils.getClassLocation(HelloDriver.class));
- final LauncherStatus status =
- DriverServiceLauncher.submit(builder.build(), DRIVER_CONFIG);
-
- LOG.log(Level.INFO, "REEF job completed: {0}", status);
-
+ DriverServiceLauncher.submit(builder.build(), DRIVER_CONFIG);
+ LOG.log(Level.INFO, "REEF job completed");
ThreadLogger.logThreads(LOG, Level.FINE, "Threads running at the end of HelloREEF:");
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java
deleted file mode 100644
index 4f83fdb..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.service;
-
-/**
- * An exception thrown by the driver client.
- */
-public final class DriverClientException extends Exception {
-
- public DriverClientException(final String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java
deleted file mode 100644
index e3f74c8..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.bridge.service;
-
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.bridge.service.parameters.DriverClientCommand;
-import org.apache.reef.client.DriverConfiguration;
-import org.apache.reef.driver.parameters.DriverIdleSources;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
-import org.apache.reef.tang.formats.RequiredImpl;
-import org.apache.reef.tang.formats.RequiredParameter;
-
-/**
- * Binds all driver bridge service handlers to the driver.
- */
-@Private
-public final class DriverServiceConfiguration extends ConfigurationModuleBuilder {
-
- public static final RequiredImpl<IDriverService> DRIVER_SERVICE_IMPL = new RequiredImpl<>();
-
- public static final RequiredParameter<String> DRIVER_CLIENT_COMMAND = new RequiredParameter<>();
-
- /** Configuration module that binds all driver handlers. */
- public static final ConfigurationModule CONF = new DriverServiceConfiguration()
- .merge(DriverConfiguration.CONF)
- .bindImplementation(IDriverService.class, DRIVER_SERVICE_IMPL)
- .bindNamedParameter(DriverClientCommand.class, DRIVER_CLIENT_COMMAND)
- .bindSetEntry(DriverIdleSources.class, IDriverService.class)
- .build();
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java
deleted file mode 100644
index cca2436..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.bridge.service;
-
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.context.ClosedContext;
-import org.apache.reef.driver.context.ContextMessage;
-import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.CompletedEvaluator;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.driver.task.*;
-import org.apache.reef.tang.annotations.Unit;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StartTime;
-import org.apache.reef.wake.time.event.StopTime;
-
-import javax.inject.Inject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Contains Java side event handlers that perform
- * hand-off with the driver client side.
- */
-@Unit
-public final class DriverServiceHandlers {
-
- private static final Logger LOG = Logger.getLogger(DriverServiceHandlers.class.getName());
-
- private final IDriverService driverBridgeService;
-
- @Inject
- private DriverServiceHandlers(
- final IDriverService driverBridgeService) {
- this.driverBridgeService = driverBridgeService;
- }
-
- /**
- * Job Driver is ready and the clock is set up: request the evaluators.
- */
- final class StartHandler implements EventHandler<StartTime> {
- @Override
- public void onNext(final StartTime startTime) {
- LOG.log(Level.INFO, "JavaBridge: Start Driver");
- DriverServiceHandlers.this.driverBridgeService.startHandler(startTime);
- }
- }
-
- /**
- * Job Driver is is shutting down: write to the log.
- */
- final class StopHandler implements EventHandler<StopTime> {
- @Override
- public void onNext(final StopTime stopTime) {
- LOG.log(Level.INFO, "JavaBridge: Stop Driver");
- DriverServiceHandlers.this.driverBridgeService.stopHandler(stopTime);
- }
- }
-
- /**
- * Receive notification that an Evaluator had been allocated,
- * and submitTask a new Task in that Evaluator.
- */
- final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
- @Override
- public void onNext(final AllocatedEvaluator eval) {
- LOG.log(Level.INFO, "JavaBridge: Allocated Evaluator {0}", eval.getId());
- DriverServiceHandlers.this.driverBridgeService.allocatedEvaluatorHandler(eval);
- }
- }
-
- final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
- @Override
- public void onNext(final CompletedEvaluator eval) {
- LOG.log(Level.INFO, "JavaBridge: Completed Evaluator {0}", eval.getId());
- DriverServiceHandlers.this.driverBridgeService.completedEvaluatorHandler(eval);
- }
- }
-
- final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
- @Override
- public void onNext(final FailedEvaluator eval) {
- LOG.log(Level.INFO, "JavaBridge: Failed Evaluator {0}", eval.getId());
- DriverServiceHandlers.this.driverBridgeService.failedEvaluatorHandler(eval);
- }
- }
-
- /**
- * Receive notification that the Context is active.
- */
- final class ActiveContextHandler implements EventHandler<ActiveContext> {
- @Override
- public void onNext(final ActiveContext context) {
- LOG.log(Level.INFO, "JavaBridge: Active Context {0}", context.getId());
- DriverServiceHandlers.this.driverBridgeService.activeContextHandler(context);
- }
- }
-
- /**
- * Received notification that the Context is closed.
- */
- final class ClosedContextHandler implements EventHandler<ClosedContext> {
- @Override
- public void onNext(final ClosedContext context) {
- LOG.log(Level.INFO, "JavaBridge: Closed Context {0}", context.getId());
- DriverServiceHandlers.this.driverBridgeService.closedContextHandler(context);
- }
- }
-
- /**
- * Received a message from the context.
- */
- final class ContextMessageHandler implements EventHandler<ContextMessage> {
- @Override
- public void onNext(final ContextMessage message) {
- LOG.log(Level.INFO, "JavaBridge: Context Message id {0}", message.getId());
- DriverServiceHandlers.this.driverBridgeService.contextMessageHandler(message);
- }
- }
-
- /**
- * Received notification that the Context failed.
- */
- final class ContextFailedHandler implements EventHandler<FailedContext> {
- @Override
- public void onNext(final FailedContext context) {
- LOG.log(Level.INFO, "JavaBridge: Context Failed {0}", context.getId());
- DriverServiceHandlers.this.driverBridgeService.failedContextHandler(context);
- }
- }
-
- /**
- * Receive notification that the Task is running.
- */
- final class RunningTaskHandler implements EventHandler<RunningTask> {
- @Override
- public void onNext(final RunningTask task) {
- LOG.log(Level.INFO, "JavaBridge: Running Task {0}", task.getId());
- DriverServiceHandlers.this.driverBridgeService.runningTaskHandler(task);
- }
- }
-
- /**
- * Received notification that the Task failed.
- */
- final class FailedTaskHandler implements EventHandler<FailedTask> {
- @Override
- public void onNext(final FailedTask task) {
- LOG.log(Level.INFO, "JavaBridge: Failed Task {0}", task.getId());
- DriverServiceHandlers.this.driverBridgeService.failedTaskHandler(task);
- }
- }
-
- /**
- * Receive notification that the Task has completed successfully.
- */
- final class CompletedTaskHandler implements EventHandler<CompletedTask> {
- @Override
- public void onNext(final CompletedTask task) {
- LOG.log(Level.INFO, "JavaBridge: Completed Task {0}", task.getId());
- DriverServiceHandlers.this.driverBridgeService.completedTaskHandler(task);
- }
- }
-
- /**
- * Received notification that the Task was suspended.
- */
- final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
- @Override
- public void onNext(final SuspendedTask task) {
- LOG.log(Level.INFO, "JavaBridge: Suspended Task {0}", task.getId());
- DriverServiceHandlers.this.driverBridgeService.suspendedTaskHandler(task);
- }
- }
-
- /**
- * Received a message from the task.
- */
- final class TaskMessageHandler implements EventHandler<TaskMessage> {
- @Override
- public void onNext(final TaskMessage message) {
- LOG.log(Level.INFO, "JavaBridge: Message from Task {0}", message.getId());
- DriverServiceHandlers.this.driverBridgeService.taskMessageHandler(message);
- }
- }
-
- /**
- * Received a message from the client.
- */
- final class ClientMessageHandler implements EventHandler<byte[]> {
- @Override
- public void onNext(final byte[] message) {
- LOG.log(Level.INFO, "JavaBridge: Message from Client");
- DriverServiceHandlers.this.driverBridgeService.clientMessageHandler(message);
- }
- }
-
- /**
- * Received a close event from the client.
- */
- final class ClientCloseHandler implements EventHandler<Void> {
- @Override
- public void onNext(final Void value) {
- LOG.log(Level.INFO, "JavaBridge: Close event from Client");
- DriverServiceHandlers.this.driverBridgeService.clientCloseHandler();
- }
- }
-
- /**
- * Received a close event with message.
- */
- final class ClientCloseWithMessageHandler implements EventHandler<byte[]> {
- @Override
- public void onNext(final byte[] message) {
- LOG.log(Level.INFO, "JavaBridge: Close event with messages from Client");
- DriverServiceHandlers.this.driverBridgeService.clientCloseWithMessageHandler(message);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java
deleted file mode 100644
index fbafff9..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.bridge.service;
-
-import com.google.protobuf.util.JsonFormat;
-import org.apache.commons.lang.StringUtils;
-import org.apache.reef.bridge.client.JavaDriverClientLauncher;
-import org.apache.reef.bridge.examples.WindowsRuntimePathProvider;
-import org.apache.reef.bridge.proto.ClientProtocol;
-import org.apache.reef.bridge.service.grpc.GRPCDriverService;
-import org.apache.reef.client.DriverConfiguration;
-import org.apache.reef.client.DriverLauncher;
-import org.apache.reef.client.LauncherStatus;
-import org.apache.reef.client.parameters.DriverConfigurationProviders;
-import org.apache.reef.io.TcpPortConfigurationProvider;
-import org.apache.reef.runtime.common.files.ClasspathProvider;
-import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.common.files.RuntimePathProvider;
-import org.apache.reef.runtime.common.files.UnixJVMPathProvider;
-import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
-import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
-import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
-import org.apache.reef.tang.*;
-import org.apache.reef.tang.exceptions.BindException;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.tang.formats.ConfigurationSerializer;
-import org.apache.reef.util.EnvironmentUtils;
-import org.apache.reef.util.OSUtils;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.*;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Driver Service Launcher - main class.
- */
-public final class DriverServiceLauncher {
-
- /**
- * Standard Java logger.
- */
- private static final Logger LOG = Logger.getLogger(DriverServiceLauncher.class.getName());
-
- /**
- * This class should not be instantiated.
- */
- private DriverServiceLauncher() {
- throw new RuntimeException("Do not instantiate this class!");
- }
-
- /**
- * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
- *
- * @param driverClientConfigurationProto containing which runtime to configure: local, yarn, azbatch
- * @return (immutable) TANG Configuration object.
- * @throws BindException if configuration commandLineInjector fails.
- * @throws InjectionException if configuration commandLineInjector fails.
- */
- private static Configuration getRuntimeConfiguration(
- final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto)
- throws BindException {
- switch (driverClientConfigurationProto.getRuntimeCase()) {
- case LOCAL_RUNTIME:
- return getLocalRuntimeConfiguration(driverClientConfigurationProto);
- case YARN_RUNTIME:
- return getYarnRuntimeConfiguration(driverClientConfigurationProto);
- default:
- throw new IllegalArgumentException("Unsupported runtime " + driverClientConfigurationProto.getRuntimeCase());
- }
- }
-
- private static Configuration getLocalRuntimeConfiguration(
- final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto)
- throws BindException {
- LOG.log(Level.FINE, "JavaBridge: Running on the local runtime");
- return LocalRuntimeConfiguration.CONF
- .build();
- }
-
- private static Configuration getYarnRuntimeConfiguration(
- final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto)
- throws BindException {
- LOG.log(Level.FINE, "JavaBridge: Running on YARN");
- return YarnClientConfiguration.CONF.build();
- }
-
- private static Configuration getDriverServiceConfiguration(
- final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) {
- // Set required parameters
- ConfigurationModule driverServiceConfigurationModule = DriverServiceConfiguration.CONF
- .set(DriverServiceConfiguration.DRIVER_SERVICE_IMPL, GRPCDriverService.class)
- .set(DriverServiceConfiguration.DRIVER_CLIENT_COMMAND,
- driverClientConfigurationProto.getDriverClientLaunchCommand())
- .set(DriverConfiguration.DRIVER_IDENTIFIER, driverClientConfigurationProto.getJobid());
-
- // Set file dependencies
- final List<String> localLibraries = new ArrayList<>();
- localLibraries.add(EnvironmentUtils.getClassLocation(GRPCDriverService.class));
- if (driverClientConfigurationProto.getLocalLibrariesCount() > 0) {
- localLibraries.addAll(driverClientConfigurationProto.getLocalLibrariesList());
- }
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .setMultiple(DriverConfiguration.LOCAL_LIBRARIES, localLibraries);
- if (driverClientConfigurationProto.getGlobalLibrariesCount() > 0) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES,
- driverClientConfigurationProto.getGlobalLibrariesList());
- }
- if (driverClientConfigurationProto.getLocalFilesCount() > 0) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .setMultiple(DriverConfiguration.LOCAL_FILES,
- driverClientConfigurationProto.getLocalFilesList());
- }
- if (driverClientConfigurationProto.getGlobalFilesCount() > 0) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .setMultiple(DriverConfiguration.GLOBAL_FILES,
- driverClientConfigurationProto.getGlobalFilesList());
- }
- // Setup driver resources
- if (driverClientConfigurationProto.getCpuCores() > 0) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.DRIVER_CPU_CORES, driverClientConfigurationProto.getCpuCores());
- }
- if (driverClientConfigurationProto.getMemoryMb() > 0) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.DRIVER_MEMORY, driverClientConfigurationProto.getMemoryMb());
- }
-
- // Setup handlers
- final Set<ClientProtocol.DriverClientConfiguration.Handlers> handlerLabelSet = new HashSet<>();
- handlerLabelSet.addAll(driverClientConfigurationProto.getHandlerList());
- if (!handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.START.START)) {
- throw new IllegalArgumentException("Start handler required");
- } else {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_DRIVER_STARTED, DriverServiceHandlers.StartHandler.class)
- .set(DriverConfiguration.ON_DRIVER_STOP, DriverServiceHandlers.StopHandler.class);
- }
- if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_ALLOCATED)) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DriverServiceHandlers.AllocatedEvaluatorHandler.class);
- }
- if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_COMPLETED)) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, DriverServiceHandlers.CompletedEvaluatorHandler.class);
- }
- if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_FAILED)) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_EVALUATOR_FAILED, DriverServiceHandlers.FailedEvaluatorHandler.class);
- }
- if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_ACTIVE)) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_CONTEXT_ACTIVE, DriverServiceHandlers.ActiveContextHandler.class);
- }
- if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_CLOSED)) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_CONTEXT_CLOSED, DriverServiceHandlers.ClosedContextHandler.class);
- }
- if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_FAILED)) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_CONTEXT_FAILED, DriverServiceHandlers.ContextFailedHandler.class);
- }
- if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_MESSAGE)) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_CONTEXT_MESSAGE, DriverServiceHandlers.ContextMessageHandler.class);
- }
- if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_RUNNING)) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_TASK_RUNNING, DriverServiceHandlers.RunningTaskHandler.class);
- }
- if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_COMPLETED)) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_TASK_COMPLETED, DriverServiceHandlers.CompletedTaskHandler.class);
- }
- if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_FAILED)) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_TASK_FAILED, DriverServiceHandlers.FailedTaskHandler.class);
- }
- if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_MESSAGE)) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_TASK_MESSAGE, DriverServiceHandlers.TaskMessageHandler.class);
- }
- if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_MESSAGE)) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverServiceHandlers.ClientMessageHandler.class);
- }
- if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_CLOSE)) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_CLIENT_CLOSED, DriverServiceHandlers.ClientCloseHandler.class);
- }
- if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_CLOSE_WITH_MESSAGE)) {
- driverServiceConfigurationModule = driverServiceConfigurationModule
- .set(DriverConfiguration.ON_CLIENT_CLOSED_MESSAGE, DriverServiceHandlers.ClientCloseWithMessageHandler.class);
- }
-
- return setTcpPortRange(driverClientConfigurationProto, driverServiceConfigurationModule.build());
- }
-
- private static Configuration setTcpPortRange(
- final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto,
- final Configuration driverServiceConfiguration) {
- JavaConfigurationBuilder configurationModuleBuilder =
- Tang.Factory.getTang().newConfigurationBuilder(driverServiceConfiguration)
- .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class);
- // Setup TCP constraints
- if (driverClientConfigurationProto.getTcpPortRangeBegin() > 0) {
- configurationModuleBuilder = configurationModuleBuilder
- .bindNamedParameter(TcpPortRangeBegin.class,
- Integer.toString(driverClientConfigurationProto.getTcpPortRangeBegin()));
- }
- if (driverClientConfigurationProto.getTcpPortRangeCount() > 0) {
- configurationModuleBuilder = configurationModuleBuilder
- .bindNamedParameter(TcpPortRangeCount.class,
- Integer.toString(driverClientConfigurationProto.getTcpPortRangeCount()));
- }
- if (driverClientConfigurationProto.getTcpPortRangeTryCount() > 0) {
- configurationModuleBuilder = configurationModuleBuilder
- .bindNamedParameter(TcpPortRangeCount.class,
- Integer.toString(driverClientConfigurationProto.getTcpPortRangeTryCount()));
- }
- return configurationModuleBuilder.build();
- }
-
- public static LauncherStatus submit(
- final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto,
- final Configuration driverClientConfiguration)
- throws InjectionException, IOException {
- ClientProtocol.DriverClientConfiguration.Builder builder =
- ClientProtocol.DriverClientConfiguration.newBuilder(driverClientConfigurationProto);
- final File driverClientConfigurationFile = new File("driverclient.conf");
- try {
- // Write driver client configuration to a file
- final Injector driverClientInjector = Tang.Factory.getTang().newInjector(driverClientConfiguration);
- final ConfigurationSerializer configurationSerializer =
- driverClientInjector.getInstance(ConfigurationSerializer.class);
- configurationSerializer.toFile(driverClientConfiguration, driverClientConfigurationFile);
-
- // Get runtime injector and piece together the launch command based on its classpath info
- final Configuration runtimeConfiguration = getRuntimeConfiguration(driverClientConfigurationProto);
- // Resolve OS Runtime Path Provider
- final Configuration runtimeOSConfiguration = Configurations.merge(
- Tang.Factory.getTang().newConfigurationBuilder()
- .bind(RuntimePathProvider.class,
- OSUtils.isWindows() ? WindowsRuntimePathProvider.class : UnixJVMPathProvider.class)
- .build(),
- runtimeConfiguration);
- final Injector runtimeInjector = Tang.Factory.getTang().newInjector(runtimeOSConfiguration);
- final REEFFileNames fileNames = runtimeInjector.getInstance(REEFFileNames.class);
- final ClasspathProvider classpathProvider = runtimeInjector.getInstance(ClasspathProvider.class);
- final RuntimePathProvider runtimePathProvider = runtimeInjector.getInstance(RuntimePathProvider.class);
- final List<String> launchCommand = new JavaLaunchCommandBuilder(JavaDriverClientLauncher.class, null)
- .setConfigurationFilePaths(
- Collections.singletonList("./" + fileNames.getLocalFolderPath() + "/" +
- driverClientConfigurationFile.getName()))
- .setJavaPath(runtimePathProvider.getPath())
- .setClassPath(classpathProvider.getEvaluatorClasspath())
- .build();
- final String cmd = StringUtils.join(launchCommand, ' ');
- builder.setDriverClientLaunchCommand(cmd);
- builder.addLocalFiles(driverClientConfigurationFile.getAbsolutePath());
-
-
-
- // Configure driver service and launch the job
- final Configuration driverServiceConfiguration = getDriverServiceConfiguration(builder.build());
- return DriverLauncher.getLauncher(runtimeOSConfiguration).run(driverServiceConfiguration);
- } finally {
- driverClientConfigurationFile.delete();
- }
- }
-
- /**
- * Main method that launches the REEF job.
- *
- * @param args command line parameters.
- */
- public static void main(final String[] args) {
- try {
- if (args.length != 1) {
- LOG.log(Level.SEVERE, DriverServiceLauncher.class.getName() +
- " accepts single argument referencing a file that contains a client protocol buffer driver configuration");
- }
- final String content;
- try {
- content = new String(Files.readAllBytes(Paths.get(args[0])));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- final ClientProtocol.DriverClientConfiguration.Builder driverClientConfigurationProtoBuilder =
- ClientProtocol.DriverClientConfiguration.newBuilder();
- JsonFormat.parser()
- .usingTypeRegistry(JsonFormat.TypeRegistry.getEmptyTypeRegistry())
- .merge(content, driverClientConfigurationProtoBuilder);
- final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto =
- driverClientConfigurationProtoBuilder.build();
-
- final Configuration runtimeConfig = getRuntimeConfiguration(driverClientConfigurationProto);
- final Configuration driverConfig = getDriverServiceConfiguration(driverClientConfigurationProto);
- DriverLauncher.getLauncher(runtimeConfig).run(driverConfig);
- LOG.log(Level.INFO, "JavaBridge: Stop Client {0}", driverClientConfigurationProto.getJobid());
- } catch (final BindException | InjectionException | IOException ex) {
- LOG.log(Level.SEVERE, "Job configuration error", ex);
- }
- }
-}