You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2018/05/01 20:36:34 UTC

[2/6] reef git commit: [REEF-2003] Revise Driver Service to allow static configuration.

http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java
new file mode 100644
index 0000000..ad7eaa8
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java
@@ -0,0 +1,919 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.driver.service.grpc;
+
+import com.google.protobuf.ByteString;
+import io.grpc.*;
+import io.grpc.stub.StreamObserver;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
+import org.apache.reef.bridge.driver.service.DriverClientException;
+import org.apache.reef.bridge.driver.service.IDriverService;
+import org.apache.reef.bridge.service.parameters.DriverClientCommand;
+import org.apache.reef.bridge.proto.*;
+import org.apache.reef.bridge.proto.Void;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.*;
+import org.apache.reef.driver.restart.DriverRestartCompleted;
+import org.apache.reef.driver.restart.DriverRestarted;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.runtime.common.driver.context.EvaluatorContext;
+import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorImpl;
+import org.apache.reef.runtime.common.driver.idle.IdleMessage;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.OSUtils;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * GRPC DriverBridgeService that interacts with higher-level languages.
+ */
+public final class GRPCDriverService implements IDriverService {
+  private static final Logger LOG = Logger.getLogger(GRPCDriverService.class.getName());
+
+  private static final Void VOID = Void.newBuilder().build();
+
+  private Server server;
+
+  private Process driverProcess;
+
+  private DriverClientGrpc.DriverClientFutureStub clientStub;
+
+  private final Clock clock;
+
+  private final ConfigurationSerializer configurationSerializer;
+
+  private final EvaluatorRequestor evaluatorRequestor;
+
+  private final JVMProcessFactory jvmProcessFactory;
+
+  private final CLRProcessFactory clrProcessFactory;
+
+  private final TcpPortProvider tcpPortProvider;
+
+  private final String driverClientCommand;
+
+  private final Map<String, AllocatedEvaluator> allocatedEvaluatorMap = new HashMap<>();
+
+  private final Map<String, ActiveContext> activeContextMap = new HashMap<>();
+
+  private final Map<String, RunningTask> runningTaskMap = new HashMap<>();
+
+  private boolean stopped = false;
+
+  @Inject
+  private GRPCDriverService(
+      final Clock clock,
+      final EvaluatorRequestor evaluatorRequestor,
+      final ConfigurationSerializer configurationSerializer,
+      final JVMProcessFactory jvmProcessFactory,
+      final CLRProcessFactory clrProcessFactory,
+      final TcpPortProvider tcpPortProvider,
+      @Parameter(DriverClientCommand.class) final String driverClientCommand) {
+    this.clock = clock;
+    this.configurationSerializer = configurationSerializer;
+    this.jvmProcessFactory = jvmProcessFactory;
+    this.clrProcessFactory = clrProcessFactory;
+    this.evaluatorRequestor = evaluatorRequestor;
+    this.driverClientCommand = driverClientCommand;
+    this.tcpPortProvider = tcpPortProvider;
+  }
+
+  private void start() throws IOException, InterruptedException {
+    for (final Integer port : this.tcpPortProvider) {
+      try {
+        this.server = ServerBuilder.forPort(port)
+            .addService(new DriverBridgeServiceImpl())
+            .build()
+            .start();
+        LOG.info("Server started, listening on " + port);
+        break;
+      } catch (IOException e) {
+        LOG.log(Level.WARNING, "Unable to bind to port [{0}]", port);
+      }
+    }
+    if (this.server == null || this.server.isTerminated()) {
+      throw new IOException("Unable to start gRPC server");
+    } else {
+      final String cmd = this.driverClientCommand + " " + this.server.getPort();
+      final String cmdOs = OSUtils.isWindows() ? "cmd.exe /c \"" + cmd + "\"" : cmd;
+      final String cmdStd = cmdOs + " 1> driverclient.stdout 2> driverclient.stderr";
+      this.driverProcess = Runtime.getRuntime().exec(cmdStd);
+      synchronized (this) {
+        // wait for driver client process to register
+        while (this.clientStub == null && driverProcessIsAlive()) {
+          this.wait(1000); // a second
+        }
+      }
+      if (driverProcessIsAlive()) {
+        Thread closeChildThread = new Thread() {
+          public void run() {
+            synchronized (GRPCDriverService.this) {
+              if (GRPCDriverService.this.driverProcess != null) {
+                GRPCDriverService.this.driverProcess.destroy();
+                GRPCDriverService.this.driverProcess = null;
+              }
+            }
+          }
+        };
+        Runtime.getRuntime().addShutdownHook(closeChildThread);
+      }
+    }
+  }
+
+  private void stop() {
+    stop(null);
+  }
+
+  private void stop(final Throwable t) {
+    if (!stopped) {
+      try {
+        if (t != null) {
+          clock.stop(t);
+        } else {
+          clock.stop();
+        }
+        if (server != null) {
+          LOG.log(Level.INFO, "Shutdown gRPC");
+          this.server.shutdown();
+          this.server = null;
+        }
+        if (this.driverProcess != null) {
+          LOG.log(Level.INFO, "Shutdown driver process");
+          this.driverProcess.destroy();
+          this.driverProcess = null;
+        }
+      } finally {
+        stopped = true;
+      }
+    }
+  }
+
+  /**
+   * Await termination on the main thread since the grpc library uses daemon threads.
+   */
+  private void blockUntilShutdown() throws InterruptedException {
+    if (server != null) {
+      server.awaitTermination();
+    }
+  }
+
+  /**
+   * Determines if the driver process is still alive by
+   * testing for its exit value, which throws {@link IllegalThreadStateException}
+   * if process is still running.
+   * @return true if driver process is alive, false otherwise
+   */
+  private boolean driverProcessIsAlive() {
+    if (this.driverProcess != null) {
+      try {
+        this.driverProcess.exitValue();
+      } catch (IllegalThreadStateException e) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private EvaluatorDescriptorInfo toEvaluatorDescriptorInfo(final EvaluatorDescriptor descriptor) {
+    if (descriptor == null) {
+      return null;
+    } else {
+      return EvaluatorDescriptorInfo.newBuilder()
+          .setCores(descriptor.getNumberOfCores())
+          .setMemory(descriptor.getMemory())
+          .setRuntimeName(descriptor.getRuntimeName())
+          .build();
+    }
+  }
+
+  @Override
+  public IdleMessage getIdleStatus() {
+    final String componentName = "Java Bridge DriverService";
+    if (this.clientStub != null) {
+      try {
+        final IdleStatus idleStatus = this.clientStub.idlenessCheckHandler(VOID).get();
+        LOG.log(Level.INFO, "is idle: " + idleStatus.getIsIdle());
+        return new IdleMessage(
+            componentName,
+            idleStatus.getReason(),
+            idleStatus.getIsIdle());
+      } catch (ExecutionException | InterruptedException e) {
+        stop(e);
+      }
+    }
+    return new IdleMessage(
+        componentName,
+        "stub not initialized",
+        true);
+  }
+
+  @Override
+  public void startHandler(final StartTime startTime) {
+    try {
+      start();
+      synchronized (this) {
+        if (this.clientStub != null) {
+          this.clientStub.startHandler(
+              StartTimeInfo.newBuilder().setStartTime(startTime.getTimestamp()).build());
+        } else {
+          stop(new IllegalStateException("Unable to start driver client"));
+        }
+      }
+    } catch (IOException | InterruptedException e) {
+      stop(e);
+    }
+  }
+
+  @Override
+  public void stopHandler(final StopTime stopTime) {
+    synchronized (this) {
+      try {
+        if (clientStub != null) {
+          this.clientStub.stopHandler(
+              StopTimeInfo.newBuilder().setStopTime(stopTime.getTimestamp()).build());
+        }
+      } finally {
+        stop();
+      }
+    }
+
+  }
+
+  @Override
+  public void allocatedEvaluatorHandler(final AllocatedEvaluator eval) {
+    synchronized (this) {
+      this.allocatedEvaluatorMap.put(eval.getId(), eval);
+      this.clientStub.allocatedEvaluatorHandler(
+          EvaluatorInfo.newBuilder()
+              .setEvaluatorId(eval.getId())
+              .setDescriptorInfo(toEvaluatorDescriptorInfo(eval.getEvaluatorDescriptor()))
+              .build());
+    }
+  }
+
+  @Override
+  public void completedEvaluatorHandler(final CompletedEvaluator eval) {
+    synchronized (this) {
+      this.allocatedEvaluatorMap.remove(eval.getId());
+      this.clientStub.completedEvaluatorHandler(
+          EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build());
+    }
+  }
+
+  @Override
+  public void failedEvaluatorHandler(final FailedEvaluator eval) {
+    synchronized (this) {
+      this.allocatedEvaluatorMap.remove(eval.getId());
+      this.clientStub.failedEvaluatorHandler(
+          EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build());
+    }
+  }
+
+  @Override
+  public void activeContextHandler(final ActiveContext context) {
+    synchronized (this) {
+      this.activeContextMap.put(context.getId(), context);
+      this.clientStub.activeContextHandler(
+          ContextInfo.newBuilder()
+              .setContextId(context.getId())
+              .setEvaluatorId(context.getEvaluatorId())
+              .setParentId(
+                  context.getParentId().isPresent() ?
+                      context.getParentId().get() : null)
+              .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
+                  context.getEvaluatorDescriptor()))
+              .build());
+    }
+  }
+
+  @Override
+  public void closedContextHandler(final ClosedContext context) {
+    synchronized (this) {
+      this.activeContextMap.remove(context.getId());
+      this.clientStub.closedContextHandler(
+          ContextInfo.newBuilder()
+              .setContextId(context.getId())
+              .setEvaluatorId(context.getEvaluatorId())
+              .setParentId(context.getParentContext().getId())
+              .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
+                  context.getEvaluatorDescriptor()))
+              .build());
+    }
+  }
+
+  @Override
+  public void failedContextHandler(final FailedContext context) {
+    synchronized (this) {
+      this.activeContextMap.remove(context.getId());
+      this.clientStub.closedContextHandler(
+          ContextInfo.newBuilder()
+              .setContextId(context.getId())
+              .setEvaluatorId(context.getEvaluatorId())
+              .setParentId(
+                  context.getParentContext().isPresent() ?
+                      context.getParentContext().get().getId() : null)
+              .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
+                  context.getEvaluatorDescriptor()))
+              .build());
+    }
+  }
+
+  @Override
+  public void contextMessageHandler(final ContextMessage message) {
+    synchronized (this) {
+      this.clientStub.contextMessageHandler(
+          ContextMessageInfo.newBuilder()
+              .setContextId(message.getId())
+              .setMessageSourceId(message.getMessageSourceID())
+              .setSequenceNumber(message.getSequenceNumber())
+              .setPayload(ByteString.copyFrom(message.get()))
+              .build());
+    }
+  }
+
+  @Override
+  public void runningTaskHandler(final RunningTask task) {
+    synchronized (this) {
+      final ActiveContext context = task.getActiveContext();
+      if (!this.activeContextMap.containsKey(context.getId())) {
+        this.activeContextMap.put(context.getId(), context);
+      }
+      this.runningTaskMap.put(task.getId(), task);
+      this.clientStub.runningTaskHandler(
+          TaskInfo.newBuilder()
+              .setTaskId(task.getId())
+              .setContext(ContextInfo.newBuilder()
+                  .setContextId(context.getId())
+                  .setEvaluatorId(context.getEvaluatorId())
+                  .setParentId(context.getParentId().isPresent() ? context.getParentId().get() : "")
+                  .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
+                      task.getActiveContext().getEvaluatorDescriptor()))
+                  .build())
+              .build());
+    }
+  }
+
+  @Override
+  public void failedTaskHandler(final FailedTask task) {
+    synchronized (this) {
+      if (task.getActiveContext().isPresent() &&
+          !this.activeContextMap.containsKey(task.getActiveContext().get().getId())) {
+        this.activeContextMap.put(task.getActiveContext().get().getId(), task.getActiveContext().get());
+      }
+      this.runningTaskMap.remove(task.getId());
+      this.clientStub.failedTaskHandler(
+          TaskInfo.newBuilder()
+              .setTaskId(task.getId())
+              .setContext(task.getActiveContext().isPresent() ?
+                      ContextInfo.newBuilder()
+                          .setContextId(task.getActiveContext().get().getId())
+                          .setEvaluatorId(task.getActiveContext().get().getEvaluatorId())
+                          .setParentId(task.getActiveContext().get().getParentId().isPresent() ?
+                              task.getActiveContext().get().getParentId().get() : "")
+                          .build() : null)
+              .build());
+    }
+  }
+
+  @Override
+  public void completedTaskHandler(final CompletedTask task) {
+    synchronized (this) {
+      if (!this.activeContextMap.containsKey(task.getActiveContext().getId())) {
+        this.activeContextMap.put(task.getActiveContext().getId(), task.getActiveContext());
+      }
+      this.runningTaskMap.remove(task.getId());
+      this.clientStub.completedTaskHandler(
+          TaskInfo.newBuilder()
+              .setTaskId(task.getId())
+              .setContext(ContextInfo.newBuilder()
+                  .setContextId(task.getActiveContext().getId())
+                  .setEvaluatorId(task.getActiveContext().getEvaluatorId())
+                  .setParentId(task.getActiveContext().getParentId().isPresent() ?
+                      task.getActiveContext().getParentId().get() : "")
+                  .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
+                      task.getActiveContext().getEvaluatorDescriptor()))
+                  .build())
+              .build());
+    }
+  }
+
+  @Override
+  public void suspendedTaskHandler(final SuspendedTask task) {
+    synchronized (this) {
+      if (!this.activeContextMap.containsKey(task.getActiveContext().getId())) {
+        this.activeContextMap.put(task.getActiveContext().getId(), task.getActiveContext());
+      }
+      this.runningTaskMap.remove(task.getId());
+      this.clientStub.suspendedTaskHandler(
+          TaskInfo.newBuilder()
+              .setTaskId(task.getId())
+              .setContext(ContextInfo.newBuilder()
+                  .setContextId(task.getActiveContext().getId())
+                  .setEvaluatorId(task.getActiveContext().getEvaluatorId())
+                  .setParentId(task.getActiveContext().getParentId().isPresent() ?
+                      task.getActiveContext().getParentId().get() : "")
+                  .build())
+              .build());
+    }
+  }
+
+  @Override
+  public void taskMessageHandler(final TaskMessage message) {
+    synchronized (this) {
+      this.clientStub.taskMessageHandler(
+          TaskMessageInfo.newBuilder()
+              .setTaskId(message.getId())
+              .setContextId(message.getContextId())
+              .setMessageSourceId(message.getMessageSourceID())
+              .setSequenceNumber(message.getSequenceNumber())
+              .setPayload(ByteString.copyFrom(message.get()))
+              .build());
+    }
+  }
+
+  @Override
+  public void clientMessageHandler(final byte[] message) {
+    synchronized (this) {
+      this.clientStub.clientMessageHandler(
+          ClientMessageInfo.newBuilder()
+              .setPayload(ByteString.copyFrom(message))
+              .build());
+    }
+  }
+
+  @Override
+  public void clientCloseHandler() {
+    synchronized (this) {
+      this.clientStub.clientCloseHandler(VOID);
+    }
+  }
+
+  @Override
+  public void clientCloseWithMessageHandler(final byte[] message) {
+    synchronized (this) {
+      this.clientStub.clientCloseWithMessageHandler(
+          ClientMessageInfo.newBuilder()
+              .setPayload(ByteString.copyFrom(message))
+              .build());
+    }
+  }
+
+  @Override
+  public void driverRestarted(final DriverRestarted restart) {
+    try {
+      start();
+      synchronized (this) {
+        if (this.clientStub != null) {
+          this.clientStub.driverRestartHandler(DriverRestartInfo.newBuilder()
+              .setResubmissionAttempts(restart.getResubmissionAttempts())
+              .setStartTime(StartTimeInfo.newBuilder()
+                  .setStartTime(restart.getStartTime().getTimestamp()).build())
+              .addAllExpectedEvaluatorIds(restart.getExpectedEvaluatorIds())
+              .build());
+        } else {
+          stop(new DriverClientException("Failed to restart driver client"));
+        }
+      }
+    } catch (InterruptedException | IOException e) {
+      stop(e);
+    }
+  }
+
+  @Override
+  public void restartRunningTask(final RunningTask task) {
+    synchronized (this) {
+      final ActiveContext context = task.getActiveContext();
+      if (!this.activeContextMap.containsKey(context.getId())) {
+        this.activeContextMap.put(context.getId(), context);
+      }
+      this.runningTaskMap.put(task.getId(), task);
+      this.clientStub.driverRestartRunningTaskHandler(
+          TaskInfo.newBuilder()
+              .setTaskId(task.getId())
+              .setContext(ContextInfo.newBuilder()
+                  .setContextId(context.getId())
+                  .setEvaluatorId(context.getEvaluatorId())
+                  .setParentId(context.getParentId().isPresent() ? context.getParentId().get() : "")
+                  .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(context.getEvaluatorDescriptor()))
+                  .build())
+              .build());
+    }
+  }
+
+  @Override
+  public void restartActiveContext(final ActiveContext context) {
+    synchronized (this) {
+      this.activeContextMap.put(context.getId(), context);
+      this.clientStub.driverRestartActiveContextHandler(
+          ContextInfo.newBuilder()
+              .setContextId(context.getId())
+              .setEvaluatorId(context.getEvaluatorId())
+              .setParentId(
+                  context.getParentId().isPresent() ?
+                      context.getParentId().get() : null)
+              .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
+                  context.getEvaluatorDescriptor()))
+              .build());
+    }
+  }
+
+  @Override
+  public void driverRestartCompleted(final DriverRestartCompleted restartCompleted) {
+    synchronized (this) {
+      this.clientStub.driverRestartCompletedHandler(DriverRestartCompletedInfo.newBuilder()
+          .setCompletionTime(StopTimeInfo.newBuilder()
+              .setStopTime(restartCompleted.getCompletedTime().getTimestamp()).build())
+          .setIsTimedOut(restartCompleted.isTimedOut())
+          .build());
+    }
+  }
+
+  @Override
+  public void restartFailedEvalautor(final FailedEvaluator evaluator) {
+    synchronized (this) {
+      this.clientStub.driverRestartFailedEvaluatorHandler(EvaluatorInfo.newBuilder()
+          .setEvaluatorId(evaluator.getId())
+          .setFailure(EvaluatorInfo.FailureInfo.newBuilder()
+              .setMessage(evaluator.getEvaluatorException() != null ?
+                  evaluator.getEvaluatorException().getMessage() : "unknown failure during restart")
+              .build())
+          .build());
+    }
+  }
+
+  private final class DriverBridgeServiceImpl
+      extends DriverServiceGrpc.DriverServiceImplBase {
+
+    @Override
+    public void registerDriverClient(
+        final DriverClientRegistration request,
+        final StreamObserver<Void> responseObserver) {
+      try {
+        final ManagedChannel channel = ManagedChannelBuilder
+            .forAddress(request.getHost(), request.getPort())
+            .usePlaintext(true)
+            .build();
+        synchronized (GRPCDriverService.this) {
+          GRPCDriverService.this.clientStub = DriverClientGrpc.newFutureStub(channel);
+          GRPCDriverService.this.notifyAll();
+        }
+      } finally {
+        responseObserver.onNext(null);
+        responseObserver.onCompleted();
+      }
+    }
+
+    @Override
+    public void requestResources(
+        final ResourceRequest request,
+        final StreamObserver<Void> responseObserver) {
+      try {
+        synchronized (GRPCDriverService.this) {
+          EvaluatorRequest.Builder requestBuilder = GRPCDriverService.this.evaluatorRequestor.newRequest();
+          requestBuilder.setNumber(request.getResourceCount());
+          requestBuilder.setNumberOfCores(request.getCores());
+          requestBuilder.setMemory(request.getMemorySize());
+          requestBuilder.setRelaxLocality(request.getRelaxLocality());
+          requestBuilder.setRuntimeName(request.getRuntimeName());
+          if (request.getNodeNameListCount() > 0) {
+            requestBuilder.addNodeNames(request.getNodeNameListList());
+          }
+          if (request.getRackNameListCount() > 0) {
+            for (final String rackName : request.getRackNameListList()) {
+              requestBuilder.addRackName(rackName);
+            }
+          }
+          GRPCDriverService.this.evaluatorRequestor.submit(requestBuilder.build());
+        }
+      } finally {
+        responseObserver.onNext(null);
+        responseObserver.onCompleted();
+      }
+    }
+
+    @Override
+    public void shutdown(
+        final ShutdownRequest request,
+        final StreamObserver<Void> responseObserver) {
+      try {
+        synchronized (GRPCDriverService.this) {
+          if (request.getException() != null) {
+            GRPCDriverService.this.clock.stop(
+                new DriverClientException(request.getException().getMessage()));
+          } else {
+            GRPCDriverService.this.clock.stop();
+          }
+        }
+      } finally {
+        responseObserver.onNext(null);
+        responseObserver.onCompleted();
+      }
+    }
+
+    @Override
+    public void setAlarm(
+        final AlarmRequest request,
+        final StreamObserver<Void> responseObserver) {
+      try {
+        synchronized (GRPCDriverService.this) {
+          GRPCDriverService.this.clock.scheduleAlarm(request.getTimeoutMs(), new EventHandler<Alarm>() {
+            @Override
+            public void onNext(final Alarm value) {
+              synchronized (GRPCDriverService.this) {
+                GRPCDriverService.this.clientStub.alarmTrigger(
+                    AlarmTriggerInfo.newBuilder().setAlarmId(request.getAlarmId()).build());
+              }
+            }
+          });
+        }
+      } finally {
+        responseObserver.onNext(null);
+        responseObserver.onCompleted();
+      }
+    }
+
+    @Override
+    public void allocatedEvaluatorOp(
+        final AllocatedEvaluatorRequest request,
+        final StreamObserver<Void> responseObserver) {
+      try {
+        if (request.getEvaluatorConfiguration() == null) {
+          responseObserver.onError(Status.INTERNAL
+              .withDescription("Evaluator configuration required")
+              .asRuntimeException());
+        } else if (request.getContextConfiguration() == null && request.getTaskConfiguration() == null) {
+          responseObserver.onError(Status.INTERNAL
+              .withDescription("Context and/or Task configuration required")
+              .asRuntimeException());
+        } else {
+          synchronized (GRPCDriverService.this) {
+            if (!GRPCDriverService.this.allocatedEvaluatorMap.containsKey(request.getEvaluatorId())) {
+              responseObserver.onError(Status.INTERNAL
+                  .withDescription("Unknown allocated evaluator " + request.getEvaluatorId())
+                  .asRuntimeException());
+            }
+            final AllocatedEvaluator evaluator =
+                GRPCDriverService.this.allocatedEvaluatorMap.get(request.getEvaluatorId());
+            if (request.getCloseEvaluator()) {
+              evaluator.close();
+            } else {
+              if (request.getAddFilesCount() > 0) {
+                for (final String file : request.getAddFilesList()) {
+                  evaluator.addFile(new File(file));
+                }
+              }
+              if (request.getAddLibrariesCount() > 0) {
+                for (final String library : request.getAddLibrariesList()) {
+                  evaluator.addLibrary(new File(library));
+                }
+              }
+              if (request.getSetProcess() != null) {
+                final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest =
+                    request.getSetProcess();
+                switch (evaluator.getEvaluatorDescriptor().getProcess().getType()) {
+                case JVM:
+                  setJVMProcess(evaluator, processRequest);
+                  break;
+                case CLR:
+                  setCLRProcess(evaluator, processRequest);
+                  break;
+                default:
+                  throw new RuntimeException("Unknown evaluator process type");
+                }
+              }
+              if (StringUtils.isEmpty(request.getEvaluatorConfiguration())) {
+                // Assume that we are running Java driver client, but this assumption could be a bug so log a warning
+                LOG.log(Level.WARNING, "No evaluator configuration detected. Assuming a Java driver client.");
+                if (request.getContextConfiguration() != null && request.getTaskConfiguration() != null) {
+                  // submit context and task
+                  Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set");
+                  Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set");
+                  try {
+                    evaluator.submitContextAndTask(
+                        configurationSerializer.fromString(request.getContextConfiguration()),
+                        configurationSerializer.fromString(request.getTaskConfiguration()));
+                  } catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                } else if (request.getContextConfiguration() != null) {
+                  // submit context
+                  Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set");
+                  try {
+                    evaluator.submitContext(configurationSerializer.fromString(request.getContextConfiguration()));
+                  } catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                } else if (request.getTaskConfiguration() != null) {
+                  // submit task
+                  Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set");
+                  try {
+                    evaluator.submitTask(configurationSerializer.fromString(request.getTaskConfiguration()));
+                  } catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                } else {
+                  throw new RuntimeException("Missing check for required evaluator configurations");
+                }
+              } else {
+                if (request.getContextConfiguration() != null && request.getTaskConfiguration() != null) {
+                  // submit context and task
+                  Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set");
+                  Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set");
+                  Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set");
+                  ((AllocatedEvaluatorImpl) evaluator).submitContextAndTask(
+                      request.getEvaluatorConfiguration(),
+                      request.getContextConfiguration(),
+                      request.getTaskConfiguration());
+                } else if (request.getContextConfiguration() != null) {
+                  // submit context
+                  Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set");
+                  Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set");
+                  ((AllocatedEvaluatorImpl) evaluator).submitContext(
+                      request.getEvaluatorConfiguration(),
+                      request.getContextConfiguration());
+                } else if (request.getTaskConfiguration() != null) {
+                  // submit task
+                  Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set");
+                  Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set");
+                  ((AllocatedEvaluatorImpl) evaluator).submitTask(
+                      request.getEvaluatorConfiguration(),
+                      request.getTaskConfiguration());
+                } else {
+                  throw new RuntimeException("Missing check for required evaluator configurations");
+                }
+              }
+            }
+          }
+        }
+      } finally {
+        responseObserver.onNext(null);
+        responseObserver.onCompleted();
+      }
+    }
+
+    @Override
+    public void activeContextOp(
+        final ActiveContextRequest request,
+        final StreamObserver<Void> responseObserver) {
+      synchronized (GRPCDriverService.this) {
+        if (!GRPCDriverService.this.activeContextMap.containsKey(request.getContextId())) {
+          LOG.log(Level.SEVERE, "Context does not exist with id " + request.getContextId());
+          responseObserver.onError(Status.INTERNAL
+              .withDescription("Context does not exist with id " + request.getContextId())
+              .asRuntimeException());
+        } else {
+          final ActiveContext context = GRPCDriverService.this.activeContextMap.get(request.getContextId());
+          if (request.getOperationCase() == ActiveContextRequest.OperationCase.CLOSE_CONTEXT) {
+            if (request.getCloseContext()) {
+              try {
+                LOG.log(Level.INFO, "closing context " + context.getId());
+                context.close();
+              } finally {
+                responseObserver.onNext(null);
+                responseObserver.onCompleted();
+              }
+            } else {
+              LOG.log(Level.SEVERE, "Close context operation not set to true");
+              responseObserver.onError(Status.INTERNAL
+                  .withDescription("Close context operation not set to true")
+                  .asRuntimeException());
+            }
+          } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.MESSAGE) {
+            if (request.getMessage() != null) {
+              try {
+                LOG.log(Level.INFO, "send message to context " + context.getId());
+                context.sendMessage(request.getMessage().toByteArray());
+              } finally {
+                responseObserver.onNext(null);
+                responseObserver.onCompleted();
+              }
+            } else {
+              responseObserver.onError(Status.INTERNAL
+                  .withDescription("Empty message on operation send message").asRuntimeException());
+            }
+          } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.NEW_CONTEXT_REQUEST) {
+            try {
+              LOG.log(Level.INFO, "submitting child context to context " + context.getId());
+              ((EvaluatorContext) context).submitContext(request.getNewContextRequest());
+            } finally {
+              responseObserver.onNext(null);
+              responseObserver.onCompleted();
+            }
+          } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.NEW_TASK_REQUEST) {
+            try {
+              LOG.log(Level.INFO, "submitting task to context " + context.getId());
+              ((EvaluatorContext) context).submitTask(request.getNewTaskRequest());
+            } finally {
+              responseObserver.onNext(null);
+              responseObserver.onCompleted();
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    public void runningTaskOp(
+        final RunningTaskRequest request,
+        final StreamObserver<Void> responseObserver) {
+      synchronized (GRPCDriverService.this) {
+        if (!GRPCDriverService.this.runningTaskMap.containsKey(request.getTaskId())) {
+          responseObserver.onError(Status.INTERNAL
+              .withDescription("Task does not exist with id " + request.getTaskId()).asRuntimeException());
+        }
+        try {
+          final RunningTask task = GRPCDriverService.this.runningTaskMap.get(request.getTaskId());
+          if (request.getCloseTask()) {
+            if (request.getMessage() != null) {
+              task.close(request.getMessage().toByteArray());
+            } else {
+              task.close();
+            }
+          } else if (request.getMessage() != null) {
+            task.send(request.getMessage().toByteArray());
+          }
+        } finally {
+          responseObserver.onNext(null);
+          responseObserver.onCompleted();
+        }
+      }
+    }
+
+    private void setCLRProcess(
+        final AllocatedEvaluator evaluator,
+        final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest) {
+      final CLRProcess process = GRPCDriverService.this.clrProcessFactory.newEvaluatorProcess();
+      if (processRequest.getMemoryMb() > 0) {
+        process.setMemory(processRequest.getMemoryMb());
+      }
+      if (processRequest.getConfigurationFileName() != null) {
+        process.setConfigurationFileName(processRequest.getConfigurationFileName());
+      }
+      if (processRequest.getStandardOut() != null) {
+        process.setStandardOut(processRequest.getStandardOut());
+      }
+      if (processRequest.getStandardErr() != null) {
+        process.setStandardErr(processRequest.getStandardErr());
+      }
+      evaluator.setProcess(process);
+    }
+
+    private void setJVMProcess(
+        final AllocatedEvaluator evaluator,
+        final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest) {
+      final JVMProcess process = GRPCDriverService.this.jvmProcessFactory.newEvaluatorProcess();
+      if (processRequest.getMemoryMb() > 0) {
+        process.setMemory(processRequest.getMemoryMb());
+      }
+      if (processRequest.getConfigurationFileName() != null) {
+        process.setConfigurationFileName(processRequest.getConfigurationFileName());
+      }
+      if (processRequest.getStandardOut() != null) {
+        process.setStandardOut(processRequest.getStandardOut());
+      }
+      if (processRequest.getStandardErr() != null) {
+        process.setStandardErr(processRequest.getStandardErr());
+      }
+      if (processRequest.getOptionsCount() > 0) {
+        for (final String option : processRequest.getOptionsList()) {
+          process.addOption(option);
+        }
+      }
+      evaluator.setProcess(process);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java
new file mode 100644
index 0000000..f0334c5
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.service.grpc;
+
+import org.apache.reef.bridge.driver.service.DriverServiceConfiguration;
+import org.apache.reef.bridge.driver.service.DriverServiceConfigurationProviderBase;
+import org.apache.reef.bridge.proto.ClientProtocol;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+
+import javax.inject.Inject;
+import java.util.logging.Logger;
+
+/**
+ * GRPC driver service configuration provider.
+ */
+public final class GRPCDriverServiceConfigurationProvider extends DriverServiceConfigurationProviderBase {
+
+  private static final Logger LOG = Logger.getLogger(GRPCDriverServiceConfigurationProvider.class.getName());
+
+  @Inject
+  private GRPCDriverServiceConfigurationProvider() {
+  }
+
+  @Override
+  public Configuration getConfiguration(final ClientProtocol.DriverClientConfiguration driverConfiguration) {
+    Configuration driverServiceConfiguration = DriverServiceConfiguration.CONF
+        .set(DriverServiceConfiguration.DRIVER_SERVICE_IMPL, GRPCDriverService.class)
+        .set(DriverServiceConfiguration.DRIVER_CLIENT_COMMAND, driverConfiguration.getDriverClientLaunchCommand())
+        .build();
+    return Configurations.merge(
+        driverServiceConfiguration,
+        getDriverConfiguration(driverConfiguration),
+        getTcpPortRangeConfiguration(driverConfiguration));
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/package-info.java
new file mode 100644
index 0000000..a94328d
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * GRPC implementation for driver bridge service.
+ */
+package org.apache.reef.bridge.service.grpc;

http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/package-info.java
new file mode 100644
index 0000000..9b58cb0
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The Java-side of the CLR/Java bridge interop via gRPC/Protocol Buffers.
+ */
+package org.apache.reef.bridge.driver.service;

http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/DriverClientCommand.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/DriverClientCommand.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/DriverClientCommand.java
new file mode 100644
index 0000000..255f60d
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/DriverClientCommand.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.service.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * What command to use when starting bridge process.
+ */
+@NamedParameter(doc = "The command to launch bridge driver process",
+    short_name = "command")
+public final class DriverClientCommand implements Name<String> {
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/package-info.java
new file mode 100644
index 0000000..6a3b956
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Driver bridge service parameters.
+ */
+package org.apache.reef.bridge.service.parameters;

http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java
deleted file mode 100644
index dac1200..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.examples;
-
-import org.apache.reef.runtime.common.files.RuntimePathProvider;
-
-import javax.inject.Inject;
-/**
- * Supplies the java binary's path for HDInsight.
- */
-public final class WindowsRuntimePathProvider implements RuntimePathProvider {
-
-  @Inject
-  public WindowsRuntimePathProvider() {
-  }
-
-  @Override
-  public String getPath() {
-    return "java";
-  }
-
-  @Override
-  public String toString() {
-    return getPath();
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java
index 020a0eb..79b2ee0 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java
@@ -20,6 +20,8 @@ package org.apache.reef.bridge.examples.hello;
 
 import org.apache.reef.driver.evaluator.AllocatedEvaluator;
 import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.FailedTask;
 import org.apache.reef.driver.task.TaskConfiguration;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.annotations.Unit;
@@ -80,4 +82,26 @@ public final class HelloDriver {
       allocatedEvaluator.submitTask(taskConfiguration);
     }
   }
+
+  /**
+   * bla bla.
+   */
+  public final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+    @Override
+    public void onNext(final CompletedTask value) {
+      LOG.log(Level.INFO, "Completed task {0}", value.getId());
+      value.getActiveContext().close();
+    }
+  }
+
+  /**
+   * bla bla.
+   */
+  public final class FailedTaskHandler implements EventHandler<FailedTask> {
+    @Override
+    public void onNext(final FailedTask value) {
+      LOG.log(Level.INFO, "Failed task {0}", value.getId());
+      value.getActiveContext().get().close();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java
index 928828c..e80cf1e 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java
@@ -18,10 +18,9 @@
  */
 package org.apache.reef.bridge.examples.hello;
 
-import org.apache.reef.bridge.client.DriverClientConfiguration;
+import org.apache.reef.bridge.driver.client.DriverClientConfiguration;
 import org.apache.reef.bridge.proto.ClientProtocol;
-import org.apache.reef.bridge.service.DriverServiceLauncher;
-import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.bridge.client.DriverServiceLauncher;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.util.EnvironmentUtils;
@@ -43,6 +42,8 @@ public final class HelloREEF {
       DriverClientConfiguration.CONF
           .set(DriverClientConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class)
           .set(DriverClientConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class)
+          .set(DriverClientConfiguration.ON_TASK_COMPLETED, HelloDriver.CompletedTaskHandler.class)
+          .set(DriverClientConfiguration.ON_TASK_FAILED, HelloDriver.FailedTaskHandler.class)
           .build();
 
   /**
@@ -58,15 +59,10 @@ public final class HelloREEF {
     builder.setLocalRuntime(ClientProtocol.LocalRuntimeParameters.newBuilder()
         .setMaxNumberOfEvaluators(1)
         .build());
-    builder.addHandler(ClientProtocol.DriverClientConfiguration.Handlers.START);
-    builder.addHandler(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_ALLOCATED);
     builder.addGlobalLibraries(EnvironmentUtils.getClassLocation(HelloDriver.class));
 
-    final LauncherStatus status =
-        DriverServiceLauncher.submit(builder.build(), DRIVER_CONFIG);
-
-    LOG.log(Level.INFO, "REEF job completed: {0}", status);
-
+    DriverServiceLauncher.submit(builder.build(), DRIVER_CONFIG);
+    LOG.log(Level.INFO, "REEF job completed");
     ThreadLogger.logThreads(LOG, Level.FINE, "Threads running at the end of HelloREEF:");
   }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java
deleted file mode 100644
index 4f83fdb..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.bridge.service;
-
-/**
- * An exception thrown by the driver client.
- */
-public final class DriverClientException extends Exception {
-
-  public DriverClientException(final String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java
deleted file mode 100644
index e3f74c8..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.bridge.service;
-
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.bridge.service.parameters.DriverClientCommand;
-import org.apache.reef.client.DriverConfiguration;
-import org.apache.reef.driver.parameters.DriverIdleSources;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
-import org.apache.reef.tang.formats.RequiredImpl;
-import org.apache.reef.tang.formats.RequiredParameter;
-
-/**
- * Binds all driver bridge service handlers to the driver.
- */
-@Private
-public final class DriverServiceConfiguration extends ConfigurationModuleBuilder {
-
-  public static final RequiredImpl<IDriverService> DRIVER_SERVICE_IMPL = new RequiredImpl<>();
-
-  public static final RequiredParameter<String> DRIVER_CLIENT_COMMAND = new RequiredParameter<>();
-
-  /** Configuration module that binds all driver handlers. */
-  public static final ConfigurationModule CONF = new DriverServiceConfiguration()
-      .merge(DriverConfiguration.CONF)
-      .bindImplementation(IDriverService.class, DRIVER_SERVICE_IMPL)
-      .bindNamedParameter(DriverClientCommand.class, DRIVER_CLIENT_COMMAND)
-      .bindSetEntry(DriverIdleSources.class, IDriverService.class)
-      .build();
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java
deleted file mode 100644
index cca2436..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.bridge.service;
-
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.context.ClosedContext;
-import org.apache.reef.driver.context.ContextMessage;
-import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.CompletedEvaluator;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.driver.task.*;
-import org.apache.reef.tang.annotations.Unit;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StartTime;
-import org.apache.reef.wake.time.event.StopTime;
-
-import javax.inject.Inject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Contains Java side event handlers that perform
- * hand-off with the driver client side.
- */
-@Unit
-public final class DriverServiceHandlers {
-
-  private static final Logger LOG = Logger.getLogger(DriverServiceHandlers.class.getName());
-
-  private final IDriverService driverBridgeService;
-
-  @Inject
-  private DriverServiceHandlers(
-      final IDriverService driverBridgeService) {
-    this.driverBridgeService = driverBridgeService;
-  }
-
-  /**
-   * Job Driver is ready and the clock is set up: request the evaluators.
-   */
-  final class StartHandler implements EventHandler<StartTime> {
-    @Override
-    public void onNext(final StartTime startTime) {
-      LOG.log(Level.INFO, "JavaBridge: Start Driver");
-      DriverServiceHandlers.this.driverBridgeService.startHandler(startTime);
-    }
-  }
-
-  /**
-   * Job Driver is is shutting down: write to the log.
-   */
-  final class StopHandler implements EventHandler<StopTime> {
-    @Override
-    public void onNext(final StopTime stopTime) {
-      LOG.log(Level.INFO, "JavaBridge: Stop Driver");
-      DriverServiceHandlers.this.driverBridgeService.stopHandler(stopTime);
-    }
-  }
-
-  /**
-   * Receive notification that an Evaluator had been allocated,
-   * and submitTask a new Task in that Evaluator.
-   */
-  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
-    @Override
-    public void onNext(final AllocatedEvaluator eval) {
-      LOG.log(Level.INFO, "JavaBridge: Allocated Evaluator {0}", eval.getId());
-      DriverServiceHandlers.this.driverBridgeService.allocatedEvaluatorHandler(eval);
-    }
-  }
-
-  final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
-    @Override
-    public void onNext(final CompletedEvaluator eval) {
-      LOG.log(Level.INFO, "JavaBridge: Completed Evaluator {0}", eval.getId());
-      DriverServiceHandlers.this.driverBridgeService.completedEvaluatorHandler(eval);
-    }
-  }
-
-  final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
-    @Override
-    public void onNext(final FailedEvaluator eval) {
-      LOG.log(Level.INFO, "JavaBridge: Failed Evaluator {0}", eval.getId());
-      DriverServiceHandlers.this.driverBridgeService.failedEvaluatorHandler(eval);
-    }
-  }
-
-  /**
-   * Receive notification that the Context is active.
-   */
-  final class ActiveContextHandler implements EventHandler<ActiveContext> {
-    @Override
-    public void onNext(final ActiveContext context) {
-      LOG.log(Level.INFO, "JavaBridge: Active Context {0}", context.getId());
-      DriverServiceHandlers.this.driverBridgeService.activeContextHandler(context);
-    }
-  }
-
-  /**
-   * Received notification that the Context is closed.
-   */
-  final class ClosedContextHandler implements EventHandler<ClosedContext> {
-    @Override
-    public void onNext(final ClosedContext context) {
-      LOG.log(Level.INFO, "JavaBridge: Closed Context {0}", context.getId());
-      DriverServiceHandlers.this.driverBridgeService.closedContextHandler(context);
-    }
-  }
-
-  /**
-   * Received a message from the context.
-   */
-  final class ContextMessageHandler implements EventHandler<ContextMessage> {
-    @Override
-    public void onNext(final ContextMessage message) {
-      LOG.log(Level.INFO, "JavaBridge: Context Message id {0}", message.getId());
-      DriverServiceHandlers.this.driverBridgeService.contextMessageHandler(message);
-    }
-  }
-
-  /**
-   * Received notification that the Context failed.
-   */
-  final class ContextFailedHandler implements EventHandler<FailedContext> {
-    @Override
-    public void onNext(final FailedContext context) {
-      LOG.log(Level.INFO, "JavaBridge: Context Failed {0}", context.getId());
-      DriverServiceHandlers.this.driverBridgeService.failedContextHandler(context);
-    }
-  }
-
-  /**
-   * Receive notification that the Task is running.
-   */
-  final class RunningTaskHandler implements EventHandler<RunningTask> {
-    @Override
-    public void onNext(final RunningTask task) {
-      LOG.log(Level.INFO, "JavaBridge: Running Task {0}", task.getId());
-      DriverServiceHandlers.this.driverBridgeService.runningTaskHandler(task);
-    }
-  }
-
-  /**
-   * Received notification that the Task failed.
-   */
-  final class FailedTaskHandler implements EventHandler<FailedTask> {
-    @Override
-    public void onNext(final FailedTask task) {
-      LOG.log(Level.INFO, "JavaBridge: Failed Task {0}", task.getId());
-      DriverServiceHandlers.this.driverBridgeService.failedTaskHandler(task);
-    }
-  }
-
-  /**
-   * Receive notification that the Task has completed successfully.
-   */
-  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
-    @Override
-    public void onNext(final CompletedTask task) {
-      LOG.log(Level.INFO, "JavaBridge: Completed Task {0}", task.getId());
-      DriverServiceHandlers.this.driverBridgeService.completedTaskHandler(task);
-    }
-  }
-
-  /**
-   * Received notification that the Task was suspended.
-   */
-  final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
-    @Override
-    public void onNext(final SuspendedTask task) {
-      LOG.log(Level.INFO, "JavaBridge: Suspended Task {0}", task.getId());
-      DriverServiceHandlers.this.driverBridgeService.suspendedTaskHandler(task);
-    }
-  }
-
-  /**
-   * Received a message from the task.
-   */
-  final class TaskMessageHandler implements EventHandler<TaskMessage> {
-    @Override
-    public void onNext(final TaskMessage message) {
-      LOG.log(Level.INFO, "JavaBridge: Message from Task {0}", message.getId());
-      DriverServiceHandlers.this.driverBridgeService.taskMessageHandler(message);
-    }
-  }
-
-  /**
-   * Received a message from the client.
-   */
-  final class ClientMessageHandler implements EventHandler<byte[]> {
-    @Override
-    public void onNext(final byte[] message) {
-      LOG.log(Level.INFO, "JavaBridge: Message from Client");
-      DriverServiceHandlers.this.driverBridgeService.clientMessageHandler(message);
-    }
-  }
-
-  /**
-   * Received a close event from the client.
-   */
-  final class ClientCloseHandler implements EventHandler<Void> {
-    @Override
-    public void onNext(final Void value) {
-      LOG.log(Level.INFO, "JavaBridge: Close event from Client");
-      DriverServiceHandlers.this.driverBridgeService.clientCloseHandler();
-    }
-  }
-
-  /**
-   * Received a close event with message.
-   */
-  final class ClientCloseWithMessageHandler implements EventHandler<byte[]> {
-    @Override
-    public void onNext(final byte[] message) {
-      LOG.log(Level.INFO, "JavaBridge: Close event with messages from Client");
-      DriverServiceHandlers.this.driverBridgeService.clientCloseWithMessageHandler(message);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java
deleted file mode 100644
index fbafff9..0000000
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.bridge.service;
-
-import com.google.protobuf.util.JsonFormat;
-import org.apache.commons.lang.StringUtils;
-import org.apache.reef.bridge.client.JavaDriverClientLauncher;
-import org.apache.reef.bridge.examples.WindowsRuntimePathProvider;
-import org.apache.reef.bridge.proto.ClientProtocol;
-import org.apache.reef.bridge.service.grpc.GRPCDriverService;
-import org.apache.reef.client.DriverConfiguration;
-import org.apache.reef.client.DriverLauncher;
-import org.apache.reef.client.LauncherStatus;
-import org.apache.reef.client.parameters.DriverConfigurationProviders;
-import org.apache.reef.io.TcpPortConfigurationProvider;
-import org.apache.reef.runtime.common.files.ClasspathProvider;
-import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.common.files.RuntimePathProvider;
-import org.apache.reef.runtime.common.files.UnixJVMPathProvider;
-import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
-import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
-import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
-import org.apache.reef.tang.*;
-import org.apache.reef.tang.exceptions.BindException;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.tang.formats.ConfigurationSerializer;
-import org.apache.reef.util.EnvironmentUtils;
-import org.apache.reef.util.OSUtils;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.*;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Driver Service Launcher - main class.
- */
-public final class DriverServiceLauncher {
-
-  /**
-   * Standard Java logger.
-   */
-  private static final Logger LOG = Logger.getLogger(DriverServiceLauncher.class.getName());
-
-  /**
-   * This class should not be instantiated.
-   */
-  private DriverServiceLauncher() {
-    throw new RuntimeException("Do not instantiate this class!");
-  }
-
-  /**
-   * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
-   *
-   * @param driverClientConfigurationProto containing which runtime to configure: local, yarn, azbatch
-   * @return (immutable) TANG Configuration object.
-   * @throws BindException      if configuration commandLineInjector fails.
-   * @throws InjectionException if configuration commandLineInjector fails.
-   */
-  private static Configuration getRuntimeConfiguration(
-      final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto)
-      throws BindException {
-    switch (driverClientConfigurationProto.getRuntimeCase()) {
-    case LOCAL_RUNTIME:
-      return getLocalRuntimeConfiguration(driverClientConfigurationProto);
-    case YARN_RUNTIME:
-      return getYarnRuntimeConfiguration(driverClientConfigurationProto);
-    default:
-      throw new IllegalArgumentException("Unsupported runtime " + driverClientConfigurationProto.getRuntimeCase());
-    }
-  }
-
-  private static Configuration getLocalRuntimeConfiguration(
-      final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto)
-      throws BindException {
-    LOG.log(Level.FINE, "JavaBridge: Running on the local runtime");
-    return LocalRuntimeConfiguration.CONF
-        .build();
-  }
-
-  private static Configuration getYarnRuntimeConfiguration(
-      final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto)
-      throws BindException {
-    LOG.log(Level.FINE, "JavaBridge: Running on YARN");
-    return YarnClientConfiguration.CONF.build();
-  }
-
-  private static Configuration getDriverServiceConfiguration(
-      final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) {
-    // Set required parameters
-    ConfigurationModule driverServiceConfigurationModule = DriverServiceConfiguration.CONF
-        .set(DriverServiceConfiguration.DRIVER_SERVICE_IMPL, GRPCDriverService.class)
-        .set(DriverServiceConfiguration.DRIVER_CLIENT_COMMAND,
-            driverClientConfigurationProto.getDriverClientLaunchCommand())
-        .set(DriverConfiguration.DRIVER_IDENTIFIER, driverClientConfigurationProto.getJobid());
-
-    // Set file dependencies
-    final List<String> localLibraries = new ArrayList<>();
-    localLibraries.add(EnvironmentUtils.getClassLocation(GRPCDriverService.class));
-    if (driverClientConfigurationProto.getLocalLibrariesCount() > 0) {
-      localLibraries.addAll(driverClientConfigurationProto.getLocalLibrariesList());
-    }
-    driverServiceConfigurationModule = driverServiceConfigurationModule
-        .setMultiple(DriverConfiguration.LOCAL_LIBRARIES, localLibraries);
-    if (driverClientConfigurationProto.getGlobalLibrariesCount() > 0) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES,
-              driverClientConfigurationProto.getGlobalLibrariesList());
-    }
-    if (driverClientConfigurationProto.getLocalFilesCount() > 0) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .setMultiple(DriverConfiguration.LOCAL_FILES,
-              driverClientConfigurationProto.getLocalFilesList());
-    }
-    if (driverClientConfigurationProto.getGlobalFilesCount() > 0) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .setMultiple(DriverConfiguration.GLOBAL_FILES,
-              driverClientConfigurationProto.getGlobalFilesList());
-    }
-    // Setup driver resources
-    if (driverClientConfigurationProto.getCpuCores() > 0) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.DRIVER_CPU_CORES, driverClientConfigurationProto.getCpuCores());
-    }
-    if (driverClientConfigurationProto.getMemoryMb() > 0) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.DRIVER_MEMORY, driverClientConfigurationProto.getMemoryMb());
-    }
-
-    // Setup handlers
-    final Set<ClientProtocol.DriverClientConfiguration.Handlers> handlerLabelSet = new HashSet<>();
-    handlerLabelSet.addAll(driverClientConfigurationProto.getHandlerList());
-    if (!handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.START.START)) {
-      throw new IllegalArgumentException("Start handler required");
-    } else {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_DRIVER_STARTED, DriverServiceHandlers.StartHandler.class)
-          .set(DriverConfiguration.ON_DRIVER_STOP, DriverServiceHandlers.StopHandler.class);
-    }
-    if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_ALLOCATED)) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DriverServiceHandlers.AllocatedEvaluatorHandler.class);
-    }
-    if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_COMPLETED)) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, DriverServiceHandlers.CompletedEvaluatorHandler.class);
-    }
-    if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_FAILED)) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_EVALUATOR_FAILED, DriverServiceHandlers.FailedEvaluatorHandler.class);
-    }
-    if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_ACTIVE)) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_CONTEXT_ACTIVE, DriverServiceHandlers.ActiveContextHandler.class);
-    }
-    if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_CLOSED)) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_CONTEXT_CLOSED, DriverServiceHandlers.ClosedContextHandler.class);
-    }
-    if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_FAILED)) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_CONTEXT_FAILED, DriverServiceHandlers.ContextFailedHandler.class);
-    }
-    if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_MESSAGE)) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_CONTEXT_MESSAGE, DriverServiceHandlers.ContextMessageHandler.class);
-    }
-    if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_RUNNING)) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_TASK_RUNNING, DriverServiceHandlers.RunningTaskHandler.class);
-    }
-    if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_COMPLETED)) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_TASK_COMPLETED, DriverServiceHandlers.CompletedTaskHandler.class);
-    }
-    if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_FAILED)) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_TASK_FAILED, DriverServiceHandlers.FailedTaskHandler.class);
-    }
-    if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_MESSAGE)) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_TASK_MESSAGE, DriverServiceHandlers.TaskMessageHandler.class);
-    }
-    if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_MESSAGE)) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverServiceHandlers.ClientMessageHandler.class);
-    }
-    if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_CLOSE)) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_CLIENT_CLOSED, DriverServiceHandlers.ClientCloseHandler.class);
-    }
-    if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_CLOSE_WITH_MESSAGE)) {
-      driverServiceConfigurationModule = driverServiceConfigurationModule
-          .set(DriverConfiguration.ON_CLIENT_CLOSED_MESSAGE, DriverServiceHandlers.ClientCloseWithMessageHandler.class);
-    }
-
-    return setTcpPortRange(driverClientConfigurationProto, driverServiceConfigurationModule.build());
-  }
-
-  private static Configuration setTcpPortRange(
-      final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto,
-      final Configuration driverServiceConfiguration) {
-    JavaConfigurationBuilder configurationModuleBuilder =
-        Tang.Factory.getTang().newConfigurationBuilder(driverServiceConfiguration)
-            .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class);
-    // Setup TCP constraints
-    if (driverClientConfigurationProto.getTcpPortRangeBegin() > 0) {
-      configurationModuleBuilder = configurationModuleBuilder
-          .bindNamedParameter(TcpPortRangeBegin.class,
-              Integer.toString(driverClientConfigurationProto.getTcpPortRangeBegin()));
-    }
-    if (driverClientConfigurationProto.getTcpPortRangeCount() > 0) {
-      configurationModuleBuilder = configurationModuleBuilder
-          .bindNamedParameter(TcpPortRangeCount.class,
-              Integer.toString(driverClientConfigurationProto.getTcpPortRangeCount()));
-    }
-    if (driverClientConfigurationProto.getTcpPortRangeTryCount() > 0) {
-      configurationModuleBuilder = configurationModuleBuilder
-          .bindNamedParameter(TcpPortRangeCount.class,
-              Integer.toString(driverClientConfigurationProto.getTcpPortRangeTryCount()));
-    }
-    return configurationModuleBuilder.build();
-  }
-
-  public static LauncherStatus submit(
-      final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto,
-      final Configuration driverClientConfiguration)
-      throws InjectionException, IOException {
-    ClientProtocol.DriverClientConfiguration.Builder builder =
-        ClientProtocol.DriverClientConfiguration.newBuilder(driverClientConfigurationProto);
-    final File driverClientConfigurationFile = new File("driverclient.conf");
-    try {
-      // Write driver client configuration to a file
-      final Injector driverClientInjector = Tang.Factory.getTang().newInjector(driverClientConfiguration);
-      final ConfigurationSerializer configurationSerializer =
-          driverClientInjector.getInstance(ConfigurationSerializer.class);
-      configurationSerializer.toFile(driverClientConfiguration, driverClientConfigurationFile);
-
-      // Get runtime injector and piece together the launch command based on its classpath info
-      final Configuration runtimeConfiguration = getRuntimeConfiguration(driverClientConfigurationProto);
-      // Resolve OS Runtime Path Provider
-      final Configuration runtimeOSConfiguration = Configurations.merge(
-          Tang.Factory.getTang().newConfigurationBuilder()
-              .bind(RuntimePathProvider.class,
-                  OSUtils.isWindows() ? WindowsRuntimePathProvider.class : UnixJVMPathProvider.class)
-              .build(),
-          runtimeConfiguration);
-      final Injector runtimeInjector = Tang.Factory.getTang().newInjector(runtimeOSConfiguration);
-      final REEFFileNames fileNames = runtimeInjector.getInstance(REEFFileNames.class);
-      final ClasspathProvider classpathProvider = runtimeInjector.getInstance(ClasspathProvider.class);
-      final RuntimePathProvider runtimePathProvider = runtimeInjector.getInstance(RuntimePathProvider.class);
-      final List<String> launchCommand = new JavaLaunchCommandBuilder(JavaDriverClientLauncher.class, null)
-          .setConfigurationFilePaths(
-              Collections.singletonList("./" + fileNames.getLocalFolderPath() + "/" +
-                  driverClientConfigurationFile.getName()))
-          .setJavaPath(runtimePathProvider.getPath())
-          .setClassPath(classpathProvider.getEvaluatorClasspath())
-          .build();
-      final String cmd = StringUtils.join(launchCommand, ' ');
-      builder.setDriverClientLaunchCommand(cmd);
-      builder.addLocalFiles(driverClientConfigurationFile.getAbsolutePath());
-
-
-
-      // Configure driver service and launch the job
-      final Configuration driverServiceConfiguration = getDriverServiceConfiguration(builder.build());
-      return DriverLauncher.getLauncher(runtimeOSConfiguration).run(driverServiceConfiguration);
-    } finally {
-      driverClientConfigurationFile.delete();
-    }
-  }
-
-  /**
-   * Main method that launches the REEF job.
-   *
-   * @param args command line parameters.
-   */
-  public static void main(final String[] args) {
-    try {
-      if (args.length != 1) {
-        LOG.log(Level.SEVERE, DriverServiceLauncher.class.getName() +
-            " accepts single argument referencing a file that contains a client protocol buffer driver configuration");
-      }
-      final String content;
-      try {
-        content = new String(Files.readAllBytes(Paths.get(args[0])));
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      final ClientProtocol.DriverClientConfiguration.Builder driverClientConfigurationProtoBuilder =
-          ClientProtocol.DriverClientConfiguration.newBuilder();
-      JsonFormat.parser()
-          .usingTypeRegistry(JsonFormat.TypeRegistry.getEmptyTypeRegistry())
-          .merge(content, driverClientConfigurationProtoBuilder);
-      final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto =
-          driverClientConfigurationProtoBuilder.build();
-
-      final Configuration runtimeConfig = getRuntimeConfiguration(driverClientConfigurationProto);
-      final Configuration driverConfig = getDriverServiceConfiguration(driverClientConfigurationProto);
-      DriverLauncher.getLauncher(runtimeConfig).run(driverConfig);
-      LOG.log(Level.INFO, "JavaBridge: Stop Client {0}", driverClientConfigurationProto.getJobid());
-    } catch (final BindException | InjectionException | IOException ex) {
-      LOG.log(Level.SEVERE, "Job configuration error", ex);
-    }
-  }
-}