You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2018/05/01 20:36:36 UTC
[4/6] reef git commit: [REEF-2003] Revise Driver Service to allow
static configuration.
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DefaultDriverClientStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DefaultDriverClientStopHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DefaultDriverClientStopHandler.java
new file mode 100644
index 0000000..0424e47
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DefaultDriverClientStopHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default java client driver stop handler.
+ */
+public final class DefaultDriverClientStopHandler implements EventHandler<StopTime> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultDriverClientStopHandler.class.getName());
+
+ @Inject
+ private DefaultDriverClientStopHandler() {}
+
+ @Override
+ public void onNext(final StopTime value) {
+ LOG.log(Level.FINEST, "Stop time {0}", value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java
new file mode 100644
index 0000000..ca3817b
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.Time;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.runtime.Timer;
+import org.apache.reef.wake.time.runtime.event.ClientAlarm;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The bridge driver client clock.
+ */
+public final class DriverClientClock implements Clock, IAlarmDispatchHandler {
+
+ private static final Logger LOG = Logger.getLogger(DriverClientClock.class.getName());
+
+ private final IDriverClientService driverClientService;
+
+ private final IDriverServiceClient driverServiceClient;
+
+ private final Timer timer;
+
+ private final Map<String, ClientAlarm> alarmMap = new HashMap<>();
+
+ private boolean closed = false;
+
+ @Inject
+ private DriverClientClock(
+ final Timer timer,
+ final IDriverClientService driverClientService,
+ final IDriverServiceClient driverServiceClient) {
+ this.timer = timer;
+ this.driverClientService = driverClientService;
+ this.driverServiceClient = driverServiceClient;
+ }
+
+ @Override
+ public Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler) {
+ final ClientAlarm alarm = new ClientAlarm(this.timer.getCurrent() + offset, handler);
+ final String alarmId = UUID.randomUUID().toString();
+ this.alarmMap.put(alarmId, alarm);
+ this.driverServiceClient.onSetAlarm(alarmId, offset);
+ return alarm;
+ }
+
+ @Override
+ public void close() {
+ stop();
+ }
+
+ @Override
+ public void stop() {
+ if (!closed) {
+ this.closed = true;
+ this.driverServiceClient.onShutdown();
+ }
+ }
+
+ @Override
+ public void stop(final Throwable exception) {
+ if (!closed) {
+ this.closed = true;
+ this.driverServiceClient.onShutdown(exception);
+ }
+ }
+
+ @Override
+ public boolean isIdle() {
+ return this.closed || this.alarmMap.isEmpty();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return this.closed;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.driverClientService.start();
+ this.driverClientService.awaitTermination();
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Alarm clock event handler.
+ * @param alarmId alarm identifier
+ */
+ @Override
+ public void onNext(final String alarmId) {
+ if (this.alarmMap.containsKey(alarmId)) {
+ final ClientAlarm clientAlarm = this.alarmMap.remove(alarmId);
+ clientAlarm.run();
+ } else {
+ LOG.log(Level.SEVERE, "Unknown alarm id {0}", alarmId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientConfiguration.java
new file mode 100644
index 0000000..c11273e
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientConfiguration.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client;
+
+import org.apache.reef.bridge.driver.client.parameters.DriverClientDispatchThreadCount;
+import org.apache.reef.bridge.driver.client.parameters.ClientDriverStopHandler;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalImpl;
+import org.apache.reef.tang.formats.RequiredImpl;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+/**
+ * Driver client configuration.
+ */
+public final class DriverClientConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * The event handler invoked right after the driver boots up.
+ */
+ public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>();
+
+ /**
+ * The event handler invoked right before the driver shuts down. Defaults to ignore.
+ */
+ public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>();
+
+ // ***** EVALUATOR HANDLER BINDINGS:
+
+ /**
+ * Event handler for allocated evaluators. Defaults to returning the evaluator if not bound.
+ */
+ public static final OptionalImpl<EventHandler<AllocatedEvaluator>> ON_EVALUATOR_ALLOCATED = new OptionalImpl<>();
+
+ /**
+ * Event handler for completed evaluators. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<CompletedEvaluator>> ON_EVALUATOR_COMPLETED = new OptionalImpl<>();
+
+ /**
+ * Event handler for failed evaluators. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_EVALUATOR_FAILED = new OptionalImpl<>();
+
+ // ***** TASK HANDLER BINDINGS:
+
+ /**
+ * Event handler for task messages. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<TaskMessage>> ON_TASK_MESSAGE = new OptionalImpl<>();
+
+ /**
+ * Event handler for completed tasks. Defaults to closing the context the task ran on if not bound.
+ */
+ public static final OptionalImpl<EventHandler<CompletedTask>> ON_TASK_COMPLETED = new OptionalImpl<>();
+
+ /**
+ * Event handler for failed tasks. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedTask>> ON_TASK_FAILED = new OptionalImpl<>();
+
+ /**
+ * Event handler for running tasks. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>();
+
+ /**
+ * Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support
+ * task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then.
+ */
+ public static final OptionalImpl<EventHandler<SuspendedTask>> ON_TASK_SUSPENDED = new OptionalImpl<>();
+
+ // ***** CLIENT HANDLER BINDINGS:
+
+ /**
+ * Event handler for client messages. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_MESSAGE = new OptionalImpl<>();
+
+ /**
+ * Event handler for close messages sent by the client. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<Void>> ON_CLIENT_CLOSED = new OptionalImpl<>();
+
+ /**
+ * Event handler for close messages sent by the client. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_CLOSED_MESSAGE = new OptionalImpl<>();
+
+ // ***** CONTEXT HANDLER BINDINGS:
+
+ /**
+ * Event handler for active context. Defaults to closing the context if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>();
+
+ /**
+ * Event handler for closed context. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>();
+
+ /**
+ * Event handler for closed context. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedContext>> ON_CONTEXT_FAILED = new OptionalImpl<>();
+
+ /**
+ * Event handler for context messages. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>();
+
+ /**
+ * Number of dispatch threads to use.
+ */
+ public static final OptionalImpl<Integer> CLIENT_DRIVER_DISPATCH_THREAD_COUNT = new OptionalImpl<>();
+
+ /**
+ * Alarm dispatch handler.
+ */
+ public static final OptionalImpl<IAlarmDispatchHandler> ALARM_DISPATCH_HANDLER = new OptionalImpl<>();
+
+ /**
+ * Default to gRPC Driver Client Service.
+ */
+ public static final OptionalImpl<IDriverClientService> DRIVER_CLIENT_SERVICE = new OptionalImpl<>();
+
+ /**
+ * Default to gRPC Driver Service Client.
+ */
+ public static final OptionalImpl<IDriverServiceClient> DRIVER_SERVICE_CLIENT = new OptionalImpl<>();
+
+ /**
+ * ConfigurationModule to fill out to get a legal Driver Configuration.
+ */
+ public static final ConfigurationModule CONF = new DriverClientConfiguration()
+ .bindImplementation(Clock.class, DriverClientClock.class)
+ .bindImplementation(EvaluatorRequestor.class, DriverClientEvaluatorRequestor.class)
+ .bindImplementation(IAlarmDispatchHandler.class, ALARM_DISPATCH_HANDLER)
+ .bindImplementation(IDriverClientService.class, DRIVER_CLIENT_SERVICE)
+ .bindImplementation(IDriverServiceClient.class, DRIVER_SERVICE_CLIENT)
+
+ .bindNamedParameter(DriverClientDispatchThreadCount.class, CLIENT_DRIVER_DISPATCH_THREAD_COUNT)
+
+ // Driver start/stop handlers
+ .bindSetEntry(DriverStartHandler.class, ON_DRIVER_STARTED)
+ .bindSetEntry(ClientDriverStopHandler.class, ON_DRIVER_STOP)
+
+ // Evaluator handlers
+ .bindSetEntry(EvaluatorAllocatedHandlers.class, ON_EVALUATOR_ALLOCATED)
+ .bindSetEntry(EvaluatorCompletedHandlers.class, ON_EVALUATOR_COMPLETED)
+ .bindSetEntry(EvaluatorFailedHandlers.class, ON_EVALUATOR_FAILED)
+
+ // Task handlers
+ .bindSetEntry(TaskRunningHandlers.class, ON_TASK_RUNNING)
+ .bindSetEntry(TaskFailedHandlers.class, ON_TASK_FAILED)
+ .bindSetEntry(TaskMessageHandlers.class, ON_TASK_MESSAGE)
+ .bindSetEntry(TaskCompletedHandlers.class, ON_TASK_COMPLETED)
+ .bindSetEntry(TaskSuspendedHandlers.class, ON_TASK_SUSPENDED)
+
+ // Context handlers
+ .bindSetEntry(ContextActiveHandlers.class, ON_CONTEXT_ACTIVE)
+ .bindSetEntry(ContextClosedHandlers.class, ON_CONTEXT_CLOSED)
+ .bindSetEntry(ContextMessageHandlers.class, ON_CONTEXT_MESSAGE)
+ .bindSetEntry(ContextFailedHandlers.class, ON_CONTEXT_FAILED)
+
+ // Client handlers
+ .bindSetEntry(ClientMessageHandlers.class, ON_CLIENT_MESSAGE)
+ .bindSetEntry(ClientCloseHandlers.class, ON_CLIENT_CLOSED)
+ .bindSetEntry(ClientCloseWithMessageHandlers.class, ON_CLIENT_CLOSED_MESSAGE)
+
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java
new file mode 100644
index 0000000..8c4eb28
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client;
+
+import com.google.common.collect.Sets;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.bridge.driver.client.parameters.DriverClientDispatchThreadCount;
+import org.apache.reef.bridge.driver.client.parameters.ClientDriverStopHandler;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.restart.DriverRestartCompleted;
+import org.apache.reef.driver.restart.DriverRestarted;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.runtime.common.utils.DispatchingEStage;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.Set;
+
+/**
+ * Async dispatch of client driver events.
+ */
+@Private
+public final class DriverClientDispatcher {
+
+ /**
+ * Dispatcher used for application provided event handlers.
+ */
+ private final DispatchingEStage applicationDispatcher;
+
+ /**
+ * Dispatcher for client close events.
+ */
+ private final DispatchingEStage clientCloseDispatcher;
+
+ /**
+ * Dispatcher for client close with message events.
+ */
+ private final DispatchingEStage clientCloseWithMessageDispatcher;
+
+ /**
+ * Dispatcher for client messages.
+ */
+ private final DispatchingEStage clientMessageDispatcher;
+
+ /**
+ * The alarm dispatcher.
+ */
+ private final DispatchingEStage alarmDispatcher;
+
+ /**
+ * Driver restart dispatcher.
+ */
+ private final DispatchingEStage driverRestartDispatcher;
+
+ @Inject
+ private DriverClientDispatcher(
+ final DriverClientExceptionHandler driverExceptionHandler,
+ final IAlarmDispatchHandler alarmDispatchHandler,
+ @Parameter(DriverClientDispatchThreadCount.class)
+ final Integer numberOfThreads,
+ // Application-provided start and stop handlers
+ @Parameter(DriverStartHandler.class)
+ final Set<EventHandler<StartTime>> startHandlers,
+ @Parameter(ClientDriverStopHandler.class)
+ final Set<EventHandler<StopTime>> stopHandlers,
+ // Application-provided Context event handlers
+ @Parameter(ContextActiveHandlers.class)
+ final Set<EventHandler<ActiveContext>> contextActiveHandlers,
+ @Parameter(ContextClosedHandlers.class)
+ final Set<EventHandler<ClosedContext>> contextClosedHandlers,
+ @Parameter(ContextFailedHandlers.class)
+ final Set<EventHandler<FailedContext>> contextFailedHandlers,
+ @Parameter(ContextMessageHandlers.class)
+ final Set<EventHandler<ContextMessage>> contextMessageHandlers,
+ // Application-provided Task event handlers
+ @Parameter(TaskRunningHandlers.class)
+ final Set<EventHandler<RunningTask>> taskRunningHandlers,
+ @Parameter(TaskCompletedHandlers.class)
+ final Set<EventHandler<CompletedTask>> taskCompletedHandlers,
+ @Parameter(TaskSuspendedHandlers.class)
+ final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
+ @Parameter(TaskMessageHandlers.class)
+ final Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
+ @Parameter(TaskFailedHandlers.class)
+ final Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
+ // Application-provided Evaluator event handlers
+ @Parameter(EvaluatorAllocatedHandlers.class)
+ final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers,
+ @Parameter(EvaluatorFailedHandlers.class)
+ final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
+ @Parameter(EvaluatorCompletedHandlers.class)
+ final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
+ // Client handlers
+ @Parameter(ClientCloseHandlers.class)
+ final Set<EventHandler<Void>> clientCloseHandlers,
+ @Parameter(ClientCloseWithMessageHandlers.class)
+ final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers,
+ @Parameter(ClientMessageHandlers.class)
+ final Set<EventHandler<byte[]>> clientMessageHandlers) {
+
+ this.applicationDispatcher = new DispatchingEStage(
+ driverExceptionHandler, numberOfThreads, "ClientDriverDispatcher");
+ // Application start and stop handlers
+ this.applicationDispatcher.register(StartTime.class, startHandlers);
+ this.applicationDispatcher.register(StopTime.class, stopHandlers);
+ // Application Context event handlers
+ this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers);
+ this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers);
+ this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers);
+ this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers);
+
+ // Application Task event handlers.
+ this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers);
+ this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers);
+ this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers);
+ this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers);
+ this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers);
+
+ // Application Evaluator event handlers
+ this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers);
+ this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedHandlers);
+ this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedHandlers);
+
+ // Client event handlers;
+ this.clientCloseDispatcher = new DispatchingEStage(this.applicationDispatcher);
+ this.clientCloseDispatcher.register(Void.class, clientCloseHandlers);
+
+ this.clientCloseWithMessageDispatcher = new DispatchingEStage(this.applicationDispatcher);
+ this.clientCloseWithMessageDispatcher.register(byte[].class, clientCloseWithMessageHandlers);
+
+ this.clientMessageDispatcher = new DispatchingEStage(this.applicationDispatcher);
+ this.clientMessageDispatcher.register(byte[].class, clientMessageHandlers);
+
+ // Alarm event handlers
+ this.alarmDispatcher = new DispatchingEStage(this.applicationDispatcher);
+ this.alarmDispatcher.register(String.class,
+ Sets.newHashSet((EventHandler<String>)alarmDispatchHandler));
+
+ // Driver restart dispatcher
+ this.driverRestartDispatcher = new DispatchingEStage(this.applicationDispatcher);
+ }
+
+ @Inject
+ private DriverClientDispatcher(
+ final DriverClientExceptionHandler driverExceptionHandler,
+ final IAlarmDispatchHandler alarmDispatchHandler,
+ @Parameter(DriverClientDispatchThreadCount.class)
+ final Integer numberOfThreads,
+ // Application-provided start and stop handlers
+ @Parameter(DriverStartHandler.class)
+ final Set<EventHandler<StartTime>> startHandlers,
+ @Parameter(ClientDriverStopHandler.class)
+ final Set<EventHandler<StopTime>> stopHandlers,
+ // Application-provided Context event handlers
+ @Parameter(ContextActiveHandlers.class)
+ final Set<EventHandler<ActiveContext>> contextActiveHandlers,
+ @Parameter(ContextClosedHandlers.class)
+ final Set<EventHandler<ClosedContext>> contextClosedHandlers,
+ @Parameter(ContextFailedHandlers.class)
+ final Set<EventHandler<FailedContext>> contextFailedHandlers,
+ @Parameter(ContextMessageHandlers.class)
+ final Set<EventHandler<ContextMessage>> contextMessageHandlers,
+ // Application-provided Task event handlers
+ @Parameter(TaskRunningHandlers.class)
+ final Set<EventHandler<RunningTask>> taskRunningHandlers,
+ @Parameter(TaskCompletedHandlers.class)
+ final Set<EventHandler<CompletedTask>> taskCompletedHandlers,
+ @Parameter(TaskSuspendedHandlers.class)
+ final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
+ @Parameter(TaskMessageHandlers.class)
+ final Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
+ @Parameter(TaskFailedHandlers.class)
+ final Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
+ // Application-provided Evaluator event handlers
+ @Parameter(EvaluatorAllocatedHandlers.class)
+ final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers,
+ @Parameter(EvaluatorFailedHandlers.class)
+ final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
+ @Parameter(EvaluatorCompletedHandlers.class)
+ final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
+ // Client handlers
+ @Parameter(ClientCloseHandlers.class)
+ final Set<EventHandler<Void>> clientCloseHandlers,
+ @Parameter(ClientCloseWithMessageHandlers.class)
+ final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers,
+ @Parameter(ClientMessageHandlers.class)
+ final Set<EventHandler<byte[]>> clientMessageHandlers,
+ // Driver restart handlers
+ @Parameter(DriverRestartHandler.class)
+ final Set<EventHandler<DriverRestarted>> driverRestartHandlers,
+ @Parameter(DriverRestartTaskRunningHandlers.class)
+ final Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers,
+ @Parameter(DriverRestartContextActiveHandlers.class)
+ final Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers,
+ @Parameter(DriverRestartCompletedHandlers.class)
+ final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers,
+ @Parameter(DriverRestartFailedEvaluatorHandlers.class)
+ final Set<EventHandler<FailedEvaluator>> driverRestartFailedEvaluatorHandlers) {
+ this(
+ driverExceptionHandler,
+ alarmDispatchHandler,
+ numberOfThreads,
+ startHandlers,
+ stopHandlers,
+ contextActiveHandlers,
+ contextClosedHandlers,
+ contextFailedHandlers,
+ contextMessageHandlers,
+ taskRunningHandlers,
+ taskCompletedHandlers,
+ taskSuspendedHandlers,
+ taskMessageEventHandlers,
+ taskExceptionEventHandlers,
+ evaluatorAllocatedHandlers,
+ evaluatorFailedHandlers,
+ evaluatorCompletedHandlers,
+ clientCloseHandlers,
+ clientCloseWithMessageHandlers,
+ clientMessageHandlers);
+ // Register driver restart handlers.
+ this.driverRestartDispatcher.register(DriverRestarted.class, driverRestartHandlers);
+ this.driverRestartDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers);
+ this.driverRestartDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers);
+ this.driverRestartDispatcher.register(DriverRestartCompleted.class, driverRestartCompletedHandlers);
+ this.driverRestartDispatcher.register(FailedEvaluator.class, driverRestartFailedEvaluatorHandlers);
+ }
+
+ public void dispatchRestart(final DriverRestarted driverRestarted) {
+ this.driverRestartDispatcher.onNext(DriverRestarted.class, driverRestarted);
+ }
+
+ public void dispatchRestart(final RunningTask task) {
+ this.driverRestartDispatcher.onNext(RunningTask.class, task);
+ }
+
+ public void dispatchRestart(final ActiveContext context) {
+ this.driverRestartDispatcher.onNext(ActiveContext.class, context);
+ }
+
+ public void dispatchRestart(final DriverRestartCompleted completed) {
+ this.driverRestartDispatcher.onNext(DriverRestartCompleted.class, completed);
+ }
+
+ public void dispatchRestart(final FailedEvaluator evaluator) {
+ this.driverRestartDispatcher.onNext(FailedEvaluator.class, evaluator);
+ }
+
+ public void dispatch(final StartTime startTime) {
+ this.applicationDispatcher.onNext(StartTime.class, startTime);
+ }
+
+ public void dispatch(final StopTime stopTime) {
+ this.applicationDispatcher.onNext(StopTime.class, stopTime);
+ }
+
+ public void dispatch(final ActiveContext context) {
+ this.applicationDispatcher.onNext(ActiveContext.class, context);
+ }
+
+ public void dispatch(final ClosedContext context) {
+ this.applicationDispatcher.onNext(ClosedContext.class, context);
+ }
+
+ public void dispatch(final FailedContext context) {
+ this.applicationDispatcher.onNext(FailedContext.class, context);
+ }
+
+ public void dispatch(final ContextMessage message) {
+ this.applicationDispatcher.onNext(ContextMessage.class, message);
+ }
+
+ public void dispatch(final AllocatedEvaluator evaluator) {
+ this.applicationDispatcher.onNext(AllocatedEvaluator.class, evaluator);
+ }
+
+ public void dispatch(final FailedEvaluator evaluator) {
+ this.applicationDispatcher.onNext(FailedEvaluator.class, evaluator);
+ }
+
+ public void dispatch(final CompletedEvaluator evaluator) {
+ this.applicationDispatcher.onNext(CompletedEvaluator.class, evaluator);
+ }
+
+ public void dispatch(final RunningTask task) {
+ this.applicationDispatcher.onNext(RunningTask.class, task);
+ }
+
+ public void dispatch(final CompletedTask task) {
+ this.applicationDispatcher.onNext(CompletedTask.class, task);
+ }
+
+ public void dispatch(final FailedTask task) {
+ this.applicationDispatcher.onNext(FailedTask.class, task);
+ }
+
+ public void dispatch(final SuspendedTask task) {
+ this.applicationDispatcher.onNext(SuspendedTask.class, task);
+ }
+
+ public void dispatch(final TaskMessage message) {
+ this.applicationDispatcher.onNext(TaskMessage.class, message);
+ }
+
+ public void clientCloseDispatch() {
+ this.clientCloseDispatcher.onNext(Void.class, null);
+ }
+
+ public void clientCloseWithMessageDispatch(final byte[] message) {
+ this.clientCloseWithMessageDispatcher.onNext(byte[].class, message);
+ }
+
+ public void clientMessageDispatch(final byte[] message) {
+ this.clientMessageDispatcher.onNext(byte[].class, message);
+ }
+
+ public void dispatchAlarm(final String alarmId) {
+ this.alarmDispatcher.onNext(String.class, alarmId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java
new file mode 100644
index 0000000..54692ec
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client;
+
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+
+import javax.inject.Inject;
+
+/**
+ * Driver Client evaluator requestor.
+ */
+public final class DriverClientEvaluatorRequestor implements EvaluatorRequestor {
+
+ private final IDriverServiceClient driverServiceClient;
+
+ @Inject
+ private DriverClientEvaluatorRequestor(final IDriverServiceClient driverServiceClient) {
+ this.driverServiceClient = driverServiceClient;
+ }
+
+ @Override
+ public void submit(final EvaluatorRequest req) {
+ this.driverServiceClient.onEvaluatorRequest(req);
+ }
+
+ @Override
+ public EvaluatorRequest.Builder newRequest() {
+ return new DriverClientEvaluatorRequestor.Builder();
+ }
+
+ /**
+ * {@link DriverClientEvaluatorRequestor.Builder} extended with a new submit method.
+ * {@link EvaluatorRequest}s are built using this builder.
+ */
+ public final class Builder extends EvaluatorRequest.Builder<DriverClientEvaluatorRequestor.Builder> {
+ @Override
+ public synchronized void submit() {
+ DriverClientEvaluatorRequestor.this.submit(this.build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java
new file mode 100644
index 0000000..9bd99d6
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client;
+
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver client exception handler.
+ */
+public final class DriverClientExceptionHandler implements EventHandler<Throwable> {
+ private static final Logger LOG = Logger.getLogger(DriverClientExceptionHandler.class.getName());
+
+ @Inject
+ private DriverClientExceptionHandler() {
+ LOG.log(Level.FINE, "Instantiated 'DriverExceptionHandler'");
+ }
+
+
+ @Override
+ public void onNext(final Throwable throwable) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IAlarmDispatchHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IAlarmDispatchHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IAlarmDispatchHandler.java
new file mode 100644
index 0000000..e8e8fe5
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IAlarmDispatchHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client;
+
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Alarm dispatch handler.
+ */
+@DefaultImplementation(DriverClientClock.class)
+public interface IAlarmDispatchHandler extends EventHandler<String> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java
new file mode 100644
index 0000000..38758bd
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client;
+
+import org.apache.reef.bridge.driver.client.grpc.DriverClientService;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+import java.io.IOException;
+
+/**
+ * Interface that driver client services implement.
+ */
+@DefaultImplementation(DriverClientService.class)
+public interface IDriverClientService {
+
+ /**
+ * Start the DriverClient service.
+ * @throws IOException when unable to start service
+ */
+ void start() throws IOException;
+
+
+ /**
+ * Wait for termination of driver client service.
+ */
+ void awaitTermination() throws InterruptedException;
+
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java
new file mode 100644
index 0000000..7cc1346
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client;
+
+import org.apache.reef.bridge.driver.client.grpc.DriverServiceClient;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.util.Optional;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * Forwards application requests to driver server.
+ */
+@DefaultImplementation(DriverServiceClient.class)
+public interface IDriverServiceClient {
+
+ /**
+ * Initiate shutdown.
+ */
+ void onShutdown();
+
+ /**
+ * Initiate shutdown with error.
+ * @param ex exception error
+ */
+ void onShutdown(final Throwable ex);
+
+ /**
+ * Set alarm.
+ * @param alarmId alarm identifier
+ * @param timeoutMS timeout in milliseconds
+ */
+ void onSetAlarm(final String alarmId, final int timeoutMS);
+
+ /**
+ * Request evaluators.
+ * @param evaluatorRequest event
+ */
+ void onEvaluatorRequest(final EvaluatorRequest evaluatorRequest);
+
+ /**
+ * Close evaluator.
+ * @param evalautorId to close
+ */
+ void onEvaluatorClose(final String evalautorId);
+
+ /**
+ * Submit context and/or task.
+ * @param evaluatorId to submit against
+ * @param contextConfiguration context configuration
+ * @param taskConfiguration task configuration
+ * @param evaluatorProcess evaluator process
+ * @param addFileList to include
+ * @param addLibraryList to include
+ */
+ void onEvaluatorSubmit(
+ final String evaluatorId,
+ final Optional<Configuration> contextConfiguration,
+ final Optional<Configuration> taskConfiguration,
+ final Optional<JVMClientProcess> evaluatorProcess,
+ final Optional<List<File>> addFileList,
+ final Optional<List<File>> addLibraryList);
+
+ // Context Operations
+
+ /**
+ * Close context.
+ * @param contextId to close
+ */
+ void onContextClose(final String contextId);
+
+ /**
+ * Submit child context.
+ * @param contextId to submit against
+ * @param contextConfiguration for child context
+ */
+ void onContextSubmitContext(
+ final String contextId,
+ final Configuration contextConfiguration);
+
+ /**
+ * Submit task.
+ * @param contextId to submit against
+ * @param taskConfiguration for task
+ */
+ void onContextSubmitTask(
+ final String contextId,
+ final Configuration taskConfiguration);
+
+ /**
+ * Send message to context.
+ * @param contextId to destination context
+ * @param message to send
+ */
+ void onContextMessage(final String contextId, final byte[] message);
+
+ // Task operations
+
+ /**
+ * Close the task.
+ * @param taskId to close
+ * @param message optional message to include
+ */
+ void onTaskClose(final String taskId, final Optional<byte[]> message);
+
+ /**
+ * Send task a message.
+ * @param taskId of destination task
+ * @param message to send
+ */
+ void onTaskMessage(final String taskId, final byte[] message);
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JVMClientProcess.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JVMClientProcess.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JVMClientProcess.java
new file mode 100644
index 0000000..124be21
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JVMClientProcess.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.evaluator.EvaluatorProcess;
+import org.apache.reef.driver.evaluator.EvaluatorType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Stub class for Evaluator Process on driver client.
+ */
+@Private
+public final class JVMClientProcess implements EvaluatorProcess {
+
+ private boolean optionSet = false;
+
+ private int megaBytes = 0;
+
+ private String configurationFileName = null;
+
+ private String standardOut = null;
+
+ private String standardErr = null;
+
+ private final List<String> optionList = new ArrayList<>();
+
+ public JVMClientProcess() {
+ }
+
+ @Override
+ public List<String> getCommandLine() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public EvaluatorType getType() {
+ return EvaluatorType.JVM;
+ }
+
+ @Override
+ public JVMClientProcess setMemory(final int mb) {
+ this.megaBytes = mb;
+ this.optionSet = true;
+ return this;
+ }
+
+ public int getMemory() {
+ return this.megaBytes;
+ }
+
+ @Override
+ public boolean isOptionSet() {
+ return optionSet;
+ }
+
+ @Override
+ public JVMClientProcess setConfigurationFileName(final String configurationFileName) {
+ this.configurationFileName = configurationFileName;
+ return this;
+ }
+
+ public String getConfigurationFileName() {
+ return this.configurationFileName;
+ }
+
+ @Override
+ public JVMClientProcess setStandardOut(final String standardOut) {
+ this.standardOut = standardOut;
+ return this;
+ }
+
+ public String getStandardOut() {
+ return this.standardOut;
+ }
+
+ @Override
+ public JVMClientProcess setStandardErr(final String standardErr) {
+ this.standardErr = standardErr;
+ return this;
+ }
+
+ public String getStandardErr() {
+ return this.standardErr;
+ }
+
+ /**
+ * Add a JVM option.
+ * @param option The full option, e.g. "-XX:+PrintGCDetails", "-Xms500m"
+ * @return this
+ */
+ public JVMClientProcess addOption(final String option) {
+ this.optionList.add(option);
+ optionSet = true;
+ return this;
+ }
+
+ public List<String> getOptions() {
+ return this.optionList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java
new file mode 100644
index 0000000..3b675ea
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client;
+
+import org.apache.reef.bridge.driver.client.grpc.DriverClientGrpcConfiguration;
+import org.apache.reef.bridge.driver.client.grpc.parameters.DriverServicePort;
+import org.apache.reef.runtime.common.REEFLauncher;
+import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
+import org.apache.reef.runtime.common.launch.REEFErrorHandler;
+import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler;
+import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.util.ThreadLogger;
+import org.apache.reef.util.logging.LoggingSetup;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+import org.apache.reef.wake.time.Clock;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver client launcher.
+ */
+public final class JavaDriverClientLauncher {
+
+ private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName());
+
+ private static final Tang TANG = Tang.Factory.getTang();
+
+ private static final Configuration LAUNCHER_STATIC_CONFIG =
+ TANG.newConfigurationBuilder()
+ .bindNamedParameter(RemoteConfiguration.ManagerName.class, "DRIVER_CLIENT_LAUNCHER")
+ .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
+ .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+ .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class)
+ .build();
+
+ static {
+ LoggingSetup.setupCommonsLogging();
+ }
+
+ /**
+ * Main configuration object of the REEF component we are launching here.
+ */
+ private final Configuration envConfig;
+
+ /**
+ * REEFLauncher is instantiated in the main() method below using
+ * Tang configuration file provided as a command line argument.
+ * @param configurationPath Path to the serialized Tang configuration file.
+ * (The file must be in the local file system).
+ * @param configurationSerializer Serializer used to read the configuration file.
+ * We currently use Avro to serialize Tang configs.
+ */
+ @Inject
+ private JavaDriverClientLauncher(
+ @Parameter(DriverServicePort.class) final Integer driverServicePort,
+ @Parameter(ClockConfigurationPath.class) final String configurationPath,
+ final ConfigurationSerializer configurationSerializer) {
+
+ this.envConfig = Configurations.merge(
+ LAUNCHER_STATIC_CONFIG,
+ DriverClientGrpcConfiguration.CONF
+ .set(DriverClientGrpcConfiguration.DRIVER_SERVICE_PORT, driverServicePort)
+ .build(),
+ readConfigurationFromDisk(configurationPath, configurationSerializer));
+ }
+
+ /**
+ * Instantiate REEF DriverServiceLauncher. This method is called from REEFLauncher.main().
+ * @param clockConfigPath Path to the local file that contains serialized configuration
+ * for the driver client.
+ * @return An instance of the configured REEFLauncher object.
+ */
+ private static JavaDriverClientLauncher getLauncher(final String clockConfigPath, final int driverServicePort) {
+
+ try {
+
+ final Configuration clockArgConfig = Configurations.merge(
+ LAUNCHER_STATIC_CONFIG,
+ DriverClientGrpcConfiguration.CONF
+ .set(DriverClientGrpcConfiguration.DRIVER_SERVICE_PORT, driverServicePort)
+ .build(),
+ TANG.newConfigurationBuilder()
+ .bindNamedParameter(ClockConfigurationPath.class, clockConfigPath)
+ .build());
+
+ return TANG.newInjector(clockArgConfig).getInstance(JavaDriverClientLauncher.class);
+
+ } catch (final BindException ex) {
+ throw fatal("Error in parsing the command line", ex);
+ } catch (final InjectionException ex) {
+ throw fatal("Unable to instantiate REEFLauncher.", ex);
+ }
+ }
+
+ /**
+ * Read configuration from a given file and deserialize it
+ * into Tang configuration object that can be used for injection.
+ * Configuration is currently serialized using Avro.
+ * This method also prints full deserialized configuration into log.
+ * @param configPath Path to the local file that contains serialized configuration
+ * of a REEF component to launch (can be either Driver or Evaluator).
+ * @param serializer An object to deserialize the configuration file.
+ * @return Tang configuration read and deserialized from a given file.
+ */
+ private static Configuration readConfigurationFromDisk(
+ final String configPath, final ConfigurationSerializer serializer) {
+
+ LOG.log(Level.FINER, "Loading configuration file: {0}", configPath);
+
+ final File evaluatorConfigFile = new File(configPath);
+
+ if (!evaluatorConfigFile.exists()) {
+ throw fatal(
+ "Configuration file " + configPath + " does not exist. Can be an issue in job submission.",
+ new FileNotFoundException(configPath));
+ }
+
+ if (!evaluatorConfigFile.canRead()) {
+ throw fatal(
+ "Configuration file " + configPath + " exists, but can't be read.",
+ new IOException(configPath));
+ }
+
+ try {
+
+ final Configuration config = serializer.fromFile(evaluatorConfigFile);
+ LOG.log(Level.FINEST, "The configuration file loaded: {0}", configPath);
+
+ return config;
+
+ } catch (final IOException e) {
+ throw fatal("Unable to parse the configuration file: " + configPath, e);
+ }
+ }
+
+ /**
+ * Launches a REEF client process (Driver or Evaluator).
+ * @param args Command-line arguments.
+ * Must be a single element containing local path to the configuration file.
+ */
+ @SuppressWarnings("checkstyle:illegalcatch")
+ public static void main(final String[] args) {
+
+ LOG.log(Level.INFO, "Entering JavaDriverClientLauncher.main().");
+
+ LOG.log(Level.FINE, "JavaDriverClientLauncher started with user name [{0}]", System.getProperty("user.name"));
+ LOG.log(Level.FINE, "JavaDriverClientLauncher started. Assertions are {0} in this process.",
+ EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED");
+
+ if (args.length != 2) {
+ final String message = "JavaDriverClientLauncher have two and only two arguments to specify the runtime clock " +
+ "configuration path and driver service port";
+
+ throw fatal(message, new IllegalArgumentException(message));
+ }
+
+ final JavaDriverClientLauncher launcher = getLauncher(args[0], Integer.parseInt(args[1]));
+
+ Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.envConfig));
+ final Injector injector = Tang.Factory.getTang().newInjector(launcher.envConfig);
+ try (final Clock reef = injector.getInstance(Clock.class)) {
+ reef.run();
+ } catch (final Throwable ex) {
+ throw fatal("Unable to configure and start Clock.", ex);
+ }
+
+ ThreadLogger.logThreads(LOG, Level.FINEST, "Threads running after Clock.close():");
+
+ LOG.log(Level.INFO, "Exiting REEFLauncher.main()");
+
+ System.exit(0); // TODO[REEF-1715]: Should be able to exit cleanly at the end of main()
+ }
+
+ /**
+ * Wrap an exception into RuntimeException with a given message,
+ * and write the same message and exception to the log.
+ * @param msg an error message to log and pass into the RuntimeException.
+ * @param t A Throwable exception to log and wrap.
+ * @return a new Runtime exception wrapping a Throwable.
+ */
+ private static RuntimeException fatal(final String msg, final Throwable t) {
+ LOG.log(Level.SEVERE, msg, t);
+ return new RuntimeException(msg, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ActiveContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ActiveContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ActiveContextBridge.java
new file mode 100644
index 0000000..4c97697
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ActiveContextBridge.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client.events;
+
+import org.apache.reef.bridge.driver.client.IDriverServiceClient;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.util.Optional;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Active context bridge.
+ */
+public final class ActiveContextBridge implements ActiveContext {
+
+ private static final Logger LOG = Logger.getLogger(ActiveContextBridge.class.getName());
+
+ private final IDriverServiceClient driverServiceClient;
+
+ private final String contextId;
+
+ private final Optional<String> parentId;
+
+ private final String evaluatorId;
+
+ private final EvaluatorDescriptor evaluatorDescriptor;
+
+ public ActiveContextBridge(
+ final IDriverServiceClient driverServiceClient,
+ final String contextId,
+ final Optional<String> parentId,
+ final String evaluatorId,
+ final EvaluatorDescriptor evaluatorDescriptor) {
+ this.driverServiceClient = driverServiceClient;
+ this.contextId = contextId;
+ this.parentId = parentId;
+ this.evaluatorId = evaluatorId;
+ this.evaluatorDescriptor = evaluatorDescriptor;
+ }
+
+ @Override
+ public void close() {
+ LOG.log(Level.INFO, "closing context " + this.contextId);
+ this.driverServiceClient.onContextClose(this.contextId);
+ }
+
+ @Override
+ public void submitTask(final Configuration taskConf) {
+ LOG.log(Level.INFO, "submitting task via context " + this.contextId);
+ this.driverServiceClient.onContextSubmitTask(this.contextId, taskConf);
+ }
+
+ @Override
+ public void submitContext(final Configuration contextConfiguration) {
+ LOG.log(Level.INFO, "submitting child context via context " + this.contextId);
+ this.driverServiceClient.onContextSubmitContext(this.contextId, contextConfiguration);
+ }
+
+ @Override
+ public void submitContextAndService(
+ final Configuration contextConfiguration,
+ final Configuration serviceConfiguration) {
+ throw new UnsupportedOperationException("Service not supported");
+ }
+
+ @Override
+ public void sendMessage(final byte[] message) {
+ LOG.log(Level.INFO, "sending message to context " + this.contextId);
+ this.driverServiceClient.onContextMessage(this.contextId, message);
+ }
+
+ @Override
+ public String getEvaluatorId() {
+ return this.evaluatorId;
+ }
+
+ @Override
+ public Optional<String> getParentId() {
+ return this.parentId;
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return this.evaluatorDescriptor;
+ }
+
+ @Override
+ public String getId() {
+ return this.contextId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/AllocatedEvaluatorBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/AllocatedEvaluatorBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/AllocatedEvaluatorBridge.java
new file mode 100644
index 0000000..2bc7599
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/AllocatedEvaluatorBridge.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.driver.client.events;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.bridge.driver.client.IDriverServiceClient;
+import org.apache.reef.bridge.driver.client.JVMClientProcess;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorProcess;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.util.Optional;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Allocated Evaluator Stub.
+ */
+@Private
+public final class AllocatedEvaluatorBridge implements AllocatedEvaluator {
+
+ private final String evaluatorId;
+
+ private final EvaluatorDescriptor evaluatorDescriptor;
+
+ private final IDriverServiceClient driverServiceClient;
+
+ private final List<File> addFileList = new ArrayList<>();
+
+ private final List<File> addLibraryList = new ArrayList<>();
+
+ private JVMClientProcess evaluatorProcess = null;
+
+ public AllocatedEvaluatorBridge(
+ final String evaluatorId,
+ final EvaluatorDescriptor evaluatorDescriptor,
+ final IDriverServiceClient driverServiceClient) {
+ this.evaluatorId = evaluatorId;
+ this.evaluatorDescriptor = evaluatorDescriptor;
+ this.driverServiceClient = driverServiceClient;
+ }
+
+ @Override
+ public String getId() {
+ return this.evaluatorId;
+ }
+
+ @Override
+ public void addFile(final File file) {
+ this.addFileList.add(file);
+ }
+
+ @Override
+ public void addLibrary(final File file) {
+ this.addLibraryList.add(file);
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return this.evaluatorDescriptor;
+ }
+
+ @Override
+ public void setProcess(final EvaluatorProcess process) {
+ if (process instanceof JVMClientProcess) {
+ this.evaluatorProcess = (JVMClientProcess) process;
+ } else {
+ throw new IllegalArgumentException(JVMClientProcess.class.getCanonicalName() + " required.");
+ }
+ }
+
+ @Override
+ public void close() {
+ this.driverServiceClient.onEvaluatorClose(getId());
+ }
+
+ @Override
+ public void submitTask(final Configuration taskConfiguration) {
+ this.driverServiceClient.onEvaluatorSubmit(
+ getId(),
+ Optional.<Configuration>empty(),
+ Optional.of(taskConfiguration),
+ this.evaluatorProcess== null ?
+ Optional.<JVMClientProcess>empty() :
+ Optional.of(this.evaluatorProcess),
+ this.addFileList.size() == 0 ?
+ Optional.<List<File>>empty() :
+ Optional.of(this.addFileList),
+ this.addLibraryList.size() == 0 ?
+ Optional.<List<File>>empty() :
+ Optional.of(this.addLibraryList));
+ }
+
+ @Override
+ public void submitContext(final Configuration contextConfiguration) {
+
+ this.driverServiceClient.onEvaluatorSubmit(
+ getId(),
+ Optional.of(contextConfiguration),
+ Optional.<Configuration>empty(),
+ this.evaluatorProcess== null ?
+ Optional.<JVMClientProcess>empty() :
+ Optional.of(this.evaluatorProcess),
+ this.addFileList.size() == 0 ?
+ Optional.<List<File>>empty() :
+ Optional.of(this.addFileList),
+ this.addLibraryList.size() == 0 ?
+ Optional.<List<File>>empty() :
+ Optional.of(this.addLibraryList));
+ }
+
+ @Override
+ public void submitContextAndService(
+ final Configuration contextConfiguration,
+ final Configuration serviceConfiguration) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void submitContextAndTask(
+ final Configuration contextConfiguration,
+ final Configuration taskConfiguration) {
+
+ this.driverServiceClient.onEvaluatorSubmit(
+ getId(),
+ Optional.of(contextConfiguration),
+ Optional.of(taskConfiguration),
+ this.evaluatorProcess== null ?
+ Optional.<JVMClientProcess>empty() :
+ Optional.of(this.evaluatorProcess),
+ this.addFileList.size() == 0 ?
+ Optional.<List<File>>empty() :
+ Optional.of(this.addFileList),
+ this.addLibraryList.size() == 0 ?
+ Optional.<List<File>>empty() :
+ Optional.of(this.addLibraryList));
+ }
+
+ @Override
+ public void submitContextAndServiceAndTask(
+ final Configuration contextConfiguration,
+ final Configuration serviceConfiguration,
+ final Configuration taskConfiguration) {
+ throw new UnsupportedOperationException();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ClosedContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ClosedContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ClosedContextBridge.java
new file mode 100644
index 0000000..4527586
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ClosedContextBridge.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client.events;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.util.Optional;
+
+/**
+ * Closed context bridge.
+ */
+@Private
+public final class ClosedContextBridge implements ClosedContext {
+
+ private final String contextId;
+
+ private final String evaluatorId;
+
+ private final ActiveContext parentContext;
+
+ private final EvaluatorDescriptor evaluatorDescriptor;
+
+ public ClosedContextBridge(
+ final String contextId,
+ final String evaluatorId,
+ final ActiveContext parentContext,
+ final EvaluatorDescriptor evaluatorDescriptor) {
+ this.contextId = contextId;
+ this.evaluatorId = evaluatorId;
+ this.parentContext = parentContext;
+ this.evaluatorDescriptor = evaluatorDescriptor;
+ }
+
+ @Override
+ public ActiveContext getParentContext() {
+ return this.parentContext;
+ }
+
+ @Override
+ public String getId() {
+ return this.contextId;
+ }
+
+ @Override
+ public String getEvaluatorId() {
+ return this.evaluatorId;
+ }
+
+ @Override
+ public Optional<String> getParentId() {
+ return Optional.of(this.parentContext.getId());
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return this.evaluatorDescriptor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedEvaluatorBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedEvaluatorBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedEvaluatorBridge.java
new file mode 100644
index 0000000..4718fc2
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedEvaluatorBridge.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client.events;
+
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+
+/**
+ * Completed Evaluator bridge.
+ */
+public final class CompletedEvaluatorBridge implements CompletedEvaluator {
+
+ private final String id;
+
+ public CompletedEvaluatorBridge(final String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getId() {
+ return this.id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedTaskBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedTaskBridge.java
new file mode 100644
index 0000000..77d2379
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedTaskBridge.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client.events;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.CompletedTask;
+
+/**
+ * Completed task bridge.
+ */
+@Private
+public final class CompletedTaskBridge implements CompletedTask {
+
+ private final String taskId;
+
+ private final ActiveContext context;
+
+ private final byte[] result;
+
+ public CompletedTaskBridge(
+ final String taskId,
+ final ActiveContext context,
+ final byte[] result) {
+ this.taskId = taskId;
+ this.context = context;
+ this.result = result;
+ }
+
+ @Override
+ public ActiveContext getActiveContext() {
+ return this.context;
+ }
+
+ @Override
+ public String getId() {
+ return this.taskId;
+ }
+
+ @Override
+ public byte[] get() {
+ return this.result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ContextMessageBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ContextMessageBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ContextMessageBridge.java
new file mode 100644
index 0000000..f208735
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ContextMessageBridge.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client.events;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ContextMessage;
+
+/**
+ * Context message bridge.
+ */
+@Private
+public final class ContextMessageBridge implements ContextMessage {
+
+ private final String contextId;
+
+ private final String messageSourceId;
+
+ private final long sequenceNumber;
+
+ private final byte[] message;
+
+ public ContextMessageBridge(
+ final String contextId,
+ final String messageSourceId,
+ final long sequenceNumber,
+ final byte[] message) {
+ this.contextId = contextId;
+ this.messageSourceId = messageSourceId;
+ this.sequenceNumber = sequenceNumber;
+ this.message = message;
+ }
+
+ @Override
+ public byte[] get() {
+ return this.message;
+ }
+
+ @Override
+ public String getId() {
+ return this.contextId;
+ }
+
+ @Override
+ public String getMessageSourceID() {
+ return this.messageSourceId;
+ }
+
+ @Override
+ public long getSequenceNumber() {
+ return this.sequenceNumber;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java
new file mode 100644
index 0000000..1c315bb
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client.events;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.exception.EvaluatorException;
+import org.apache.reef.util.Optional;
+
+/**
+ * Failed context bridge.
+ */
+public final class FailedContextBridge implements FailedContext {
+
+ private final String contextId;
+
+ private final String evaluatorId;
+
+ private final String message;
+
+ private final EvaluatorDescriptor evaluatorDescriptor;
+
+ private final Optional<ActiveContext> parentContext;
+
+ private final Optional<byte[]> data;
+
+ public FailedContextBridge(
+ final String contextId,
+ final String evaluatorId,
+ final String message,
+ final EvaluatorDescriptor evaluatorDescriptor,
+ final Optional<ActiveContext> parentContext,
+ final Optional<byte[]> data) {
+ this.contextId = contextId;
+ this.evaluatorId = evaluatorId;
+ this.message = message;
+ this.evaluatorDescriptor = evaluatorDescriptor;
+ this.parentContext = parentContext;
+ this.data = data;
+ }
+
+ @Override
+ public Optional<ActiveContext> getParentContext() {
+ return this.parentContext;
+ }
+
+ @Override
+ public String getMessage() {
+ return this.message;
+ }
+
+ @Override
+ public Optional<String> getDescription() {
+ return Optional.of(message);
+ }
+
+ @Override
+ public Optional<Throwable> getReason() {
+ return Optional.<Throwable>of(new EvaluatorException(this.evaluatorId, this.message));
+ }
+
+ @Override
+ public Optional<byte[]> getData() {
+ return this.data;
+ }
+
+ @Override
+ public Throwable asError() {
+ return new EvaluatorException(this.evaluatorId, this.message);
+ }
+
+ @Override
+ public String getEvaluatorId() {
+ return this.evaluatorId;
+ }
+
+ @Override
+ public Optional<String> getParentId() {
+ return this.parentContext.isPresent() ?
+ Optional.of(this.parentContext.get().getId()) : Optional.<String>empty();
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return this.evaluatorDescriptor;
+ }
+
+ @Override
+ public String getId() {
+ return this.contextId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedEvaluatorBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedEvaluatorBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedEvaluatorBridge.java
new file mode 100644
index 0000000..64b268e
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedEvaluatorBridge.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client.events;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.exception.EvaluatorException;
+import org.apache.reef.util.Optional;
+
+import java.util.List;
+
+/**
+ * Failed Evaluator bridge.
+ */
+@Private
+public final class FailedEvaluatorBridge implements FailedEvaluator {
+
+ private final String id;
+
+ private final EvaluatorException evaluatorException;
+
+ private final List<FailedContext> failedContextList;
+
+ private Optional<FailedTask> failedTask;
+
+ public FailedEvaluatorBridge(
+ final String id,
+ final EvaluatorException evaluatorException,
+ final List<FailedContext> failedContextList,
+ final Optional<FailedTask> failedTask) {
+ this.id = id;
+ this.evaluatorException = evaluatorException;
+ this.failedContextList = failedContextList;
+ this.failedTask = failedTask;
+ }
+
+ @Override
+ public EvaluatorException getEvaluatorException() {
+ return this.evaluatorException;
+ }
+
+ @Override
+ public List<FailedContext> getFailedContextList() {
+ return this.failedContextList;
+ }
+
+ @Override
+ public Optional<FailedTask> getFailedTask() {
+ return this.failedTask;
+ }
+
+ @Override
+ public String getId() {
+ return this.id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java
new file mode 100644
index 0000000..d6d3f5e
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client.events;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.bridge.driver.client.IDriverServiceClient;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
+import org.apache.reef.util.Optional;
+
+/**
+ * Running task bridge.
+ */
+@Private
+public final class RunningTaskBridge implements RunningTask {
+
+ private final IDriverServiceClient driverServiceClient;
+
+ private final String taskId;
+
+ private final ActiveContext context;
+
+
+ public RunningTaskBridge(
+ final IDriverServiceClient driverServiceClient,
+ final String taskId,
+ final ActiveContext context) {
+ this.driverServiceClient = driverServiceClient;
+ this.taskId = taskId;
+ this.context = context;
+ }
+
+ @Override
+ public ActiveContext getActiveContext() {
+ return this.context;
+ }
+
+ @Override
+ public void send(final byte[] message) {
+ this.driverServiceClient.onTaskMessage(this.taskId, message);
+ }
+
+ @Override
+ public void suspend(final byte[] message) {
+ throw new UnsupportedOperationException("Suspend task not supported");
+ }
+
+ @Override
+ public void suspend() {
+ throw new UnsupportedOperationException("Suspend task not supported");
+ }
+
+ @Override
+ public void close(final byte[] message) {
+ this.driverServiceClient.onTaskClose(this.taskId, Optional.of(message));
+ }
+
+ @Override
+ public void close() {
+ this.driverServiceClient.onTaskClose(this.taskId, Optional.<byte[]>empty());
+ }
+
+ @Override
+ public TaskRepresenter getTaskRepresenter() {
+ throw new UnsupportedOperationException("Not a public API");
+ }
+
+ @Override
+ public String getId() {
+ return this.taskId;
+ }
+}