You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:13 UTC
[40/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverExceptionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverExceptionHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverExceptionHandler.java
new file mode 100644
index 0000000..d145246
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverExceptionHandler.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Exception handler for exceptions thrown by client code in the Driver.
+ * It uses the JobMessageObserverImpl to send an update to the client and die.
+ */
+@Private
+@DriverSide
+public final class DriverExceptionHandler implements EventHandler<Throwable> {
+ private static final Logger LOG = Logger.getLogger(DriverExceptionHandler.class.getName());
+ /**
+ * We delegate the failures to this object.
+ */
+ private final DriverStatusManager driverStatusManager;
+
+ @Inject
+ public DriverExceptionHandler(final DriverStatusManager driverStatusManager) {
+ this.driverStatusManager = driverStatusManager;
+ LOG.log(Level.FINE, "Instantiated 'DriverExceptionHandler'");
+ }
+
+
+ @Override
+ public void onNext(final Throwable throwable) {
+ this.driverStatusManager.onError(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfiguration.java
new file mode 100644
index 0000000..a2e4078
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfiguration.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.ResourceCatalog;
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.parameters.DriverIdleSources;
+import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.common.driver.catalog.ResourceCatalogImpl;
+import org.apache.reef.runtime.common.driver.client.ClientManager;
+import org.apache.reef.runtime.common.driver.client.JobMessageObserverImpl;
+import org.apache.reef.runtime.common.driver.idle.ClockIdlenessSource;
+import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource;
+import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorHandler;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationHandler;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceManagerStatus;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusHandler;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.wake.time.Clock;
+
+@Private
+@ClientSide
+public final class DriverRuntimeConfiguration extends ConfigurationModuleBuilder {
+
+ public static final ConfigurationModule CONF = new DriverRuntimeConfiguration()
+ // Resource Catalog
+ .bindImplementation(ResourceCatalog.class, ResourceCatalogImpl.class)
+
+ // JobMessageObserver
+ .bindImplementation(EvaluatorRequestor.class, EvaluatorRequestorImpl.class) // requesting evaluators
+ .bindImplementation(JobMessageObserver.class, JobMessageObserverImpl.class) // sending message to job client
+
+ // Client manager
+ .bindNamedParameter(DriverRuntimeConfigurationOptions.JobControlHandler.class, ClientManager.class)
+
+ // Bind the resourcemanager parameters
+ .bindNamedParameter(RuntimeParameters.NodeDescriptorHandler.class, NodeDescriptorHandler.class)
+ .bindNamedParameter(RuntimeParameters.ResourceAllocationHandler.class, ResourceAllocationHandler.class)
+ .bindNamedParameter(RuntimeParameters.ResourceStatusHandler.class, ResourceStatusHandler.class)
+ .bindNamedParameter(RuntimeParameters.RuntimeStatusHandler.class, ResourceManagerStatus.class)
+
+ // Bind to the Clock
+ .bindSetEntry(Clock.RuntimeStartHandler.class, DriverRuntimeStartHandler.class)
+ .bindSetEntry(Clock.RuntimeStopHandler.class, DriverRuntimeStopHandler.class)
+
+ // Bind the idle handlers
+ .bindSetEntry(DriverIdleSources.class, ClockIdlenessSource.class)
+ .bindSetEntry(DriverIdleSources.class, EventHandlerIdlenessSource.class)
+ .bindSetEntry(DriverIdleSources.class, ResourceManagerStatus.class)
+ .bindSetEntry(Clock.IdleHandler.class, ClockIdlenessSource.class)
+
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfigurationOptions.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfigurationOptions.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfigurationOptions.java
new file mode 100644
index 0000000..ae26f8e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfigurationOptions.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Houses the Driver resourcemanager configuration's NamedParameters
+ */
+public class DriverRuntimeConfigurationOptions {
+ @NamedParameter(doc = "Called when a job control message is received by the client.")
+ public final static class JobControlHandler implements Name<EventHandler<ClientRuntimeProtocol.JobControlProto>> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
new file mode 100644
index 0000000..114981b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorHeartbeatHandler;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorResourceManagerErrorHandler;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceManagerStatus;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The RuntimeStart handler of the Driver.
+ * <p/>
+ * This instantiates the DriverSingletons upon construction. Upon onNext(), it sets the resource manager status and
+ * wires up the remote event handler connections to the client and the evaluators.
+ */
+final class DriverRuntimeStartHandler implements EventHandler<RuntimeStart> {
+ private static final Logger LOG = Logger.getLogger(DriverRuntimeStartHandler.class.getName());
+ private final RemoteManager remoteManager;
+ private final EvaluatorResourceManagerErrorHandler evaluatorResourceManagerErrorHandler;
+ private final EvaluatorHeartbeatHandler evaluatorHeartbeatHandler;
+ private final ResourceManagerStatus resourceManagerStatus;
+ private final DriverStatusManager driverStatusManager;
+
+ /**
+ * @param singletons the objects we want to be Singletons in the Driver
+ * @param remoteManager the remoteManager in the Driver.
+ * @param evaluatorResourceManagerErrorHandler This will be wired up to the remoteManager on onNext()
+ * @param evaluatorHeartbeatHandler This will be wired up to the remoteManager on onNext()
+ * @param resourceManagerStatus will be set to RUNNING in onNext()
+ * @param driverStatusManager will be set to RUNNING in onNext()
+ */
+ @Inject
+ DriverRuntimeStartHandler(final DriverSingletons singletons,
+ final RemoteManager remoteManager,
+ final EvaluatorResourceManagerErrorHandler evaluatorResourceManagerErrorHandler,
+ final EvaluatorHeartbeatHandler evaluatorHeartbeatHandler,
+ final ResourceManagerStatus resourceManagerStatus,
+ final DriverStatusManager driverStatusManager) {
+ this.remoteManager = remoteManager;
+ this.evaluatorResourceManagerErrorHandler = evaluatorResourceManagerErrorHandler;
+ this.evaluatorHeartbeatHandler = evaluatorHeartbeatHandler;
+ this.resourceManagerStatus = resourceManagerStatus;
+ this.driverStatusManager = driverStatusManager;
+ }
+
+ @Override
+ public synchronized void onNext(final RuntimeStart runtimeStart) {
+ LOG.log(Level.FINEST, "RuntimeStart: {0}", runtimeStart);
+
+ this.remoteManager.registerHandler(EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.class, evaluatorHeartbeatHandler);
+ this.remoteManager.registerHandler(ReefServiceProtos.RuntimeErrorProto.class, evaluatorResourceManagerErrorHandler);
+ this.resourceManagerStatus.setRunning();
+ this.driverStatusManager.onRunning();
+ LOG.log(Level.FINEST, "DriverRuntimeStartHandler complete.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
new file mode 100644
index 0000000..70b1f97
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Handler for the RuntimeStop event in the Driver. It shuts down the evaluators and the RemoteManager and
+ * informs the Client.
+ */
+@Private
+@DriverSide
+final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> {
+ private static final Logger LOG = Logger.getLogger(DriverRuntimeStopHandler.class.getName());
+
+ private final DriverStatusManager driverStatusManager;
+ private final RemoteManager remoteManager;
+ private final Evaluators evaluators;
+
+ @Inject
+ DriverRuntimeStopHandler(final DriverStatusManager driverStatusManager,
+ final RemoteManager remoteManager,
+ final Evaluators evaluators) {
+ this.driverStatusManager = driverStatusManager;
+ this.remoteManager = remoteManager;
+ this.evaluators = evaluators;
+ }
+
+ @Override
+ public synchronized void onNext(final RuntimeStop runtimeStop) {
+ LOG.log(Level.FINEST, "RuntimeStop: {0}", runtimeStop);
+ // Shutdown the Evaluators.
+ this.evaluators.close();
+ // Inform the client of the shutdown.
+ final Optional<Throwable> exception = Optional.<Throwable>ofNullable(runtimeStop.getException());
+ this.driverStatusManager.sendJobEndingMessageToClient(exception);
+ // Close the remoteManager.
+ try {
+ this.remoteManager.close();
+ LOG.log(Level.INFO, "Driver shutdown complete");
+ } catch (final Exception e) {
+ throw new RuntimeException("Unable to close the RemoteManager.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingleton.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingleton.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingleton.java
new file mode 100644
index 0000000..51da6c0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingleton.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java
new file mode 100644
index 0000000..6ab42e9
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Set;
+
+/**
+ * A class that depends on all objects we want to enforce to be singletons in the Driver.
+ * The DriverRuntimeStartHandler depends on an instance of this class, which instantiates its dependencies.
+ */
+@DriverSide
+@Private
+final class DriverSingletons {
+ @Inject
+ DriverSingletons(
+ // Application event handlers
+ final @Parameter(ContextActiveHandlers.class) Set<EventHandler<ActiveContext>> contextActiveEventHandlers,
+ final @Parameter(ContextClosedHandlers.class) Set<EventHandler<ClosedContext>> contextClosedEventHandlers,
+ final @Parameter(ContextFailedHandlers.class) Set<EventHandler<FailedContext>> contextFailedEventHandlers,
+ final @Parameter(ContextMessageHandlers.class) Set<EventHandler<ContextMessage>> contextMessageHandlers,
+ final @Parameter(TaskRunningHandlers.class) Set<EventHandler<RunningTask>> taskRunningEventHandlers,
+ final @Parameter(TaskCompletedHandlers.class) Set<EventHandler<CompletedTask>> taskCompletedEventHandlers,
+ final @Parameter(TaskSuspendedHandlers.class) Set<EventHandler<SuspendedTask>> taskSuspendedEventHandlers,
+ final @Parameter(TaskMessageHandlers.class) Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
+ final @Parameter(TaskFailedHandlers.class) Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
+ final @Parameter(EvaluatorAllocatedHandlers.class) Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedEventHandlers,
+ final @Parameter(EvaluatorFailedHandlers.class) Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
+ final @Parameter(EvaluatorCompletedHandlers.class) Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
+
+ // Service event handlers
+ final @Parameter(ServiceContextActiveHandlers.class) Set<EventHandler<ActiveContext>> serviceContextActiveEventHandlers,
+ final @Parameter(ServiceContextClosedHandlers.class) Set<EventHandler<ClosedContext>> serviceContextClosedEventHandlers,
+ final @Parameter(ServiceContextFailedHandlers.class) Set<EventHandler<FailedContext>> serviceContextFailedEventHandlers,
+ final @Parameter(ServiceContextMessageHandlers.class) Set<EventHandler<ContextMessage>> serviceContextMessageHandlers,
+ final @Parameter(ServiceTaskRunningHandlers.class) Set<EventHandler<RunningTask>> serviceTaskRunningEventHandlers,
+ final @Parameter(ServiceTaskCompletedHandlers.class) Set<EventHandler<CompletedTask>> serviceTaskCompletedEventHandlers,
+ final @Parameter(ServiceTaskSuspendedHandlers.class) Set<EventHandler<SuspendedTask>> serviceTaskSuspendedEventHandlers,
+ final @Parameter(ServiceTaskMessageHandlers.class) Set<EventHandler<TaskMessage>> serviceTaskMessageEventHandlers,
+ final @Parameter(ServiceTaskFailedHandlers.class) Set<EventHandler<FailedTask>> serviceTaskExceptionEventHandlers,
+ final @Parameter(ServiceEvaluatorAllocatedHandlers.class) Set<EventHandler<AllocatedEvaluator>> serviceEvaluatorAllocatedEventHandlers,
+ final @Parameter(ServiceEvaluatorFailedHandlers.class) Set<EventHandler<FailedEvaluator>> serviceEvaluatorFailedHandlers,
+ final @Parameter(ServiceEvaluatorCompletedHandlers.class) Set<EventHandler<CompletedEvaluator>> serviceEvaluatorCompletedHandlers,
+
+ // Client event handler
+ final @Parameter(DriverRuntimeConfigurationOptions.JobControlHandler.class) EventHandler<ClientRuntimeProtocol.JobControlProto> jobControlHandler,
+
+ // Resource*Handlers - Should be invoked once
+ // The YarnResourceLaunchHandler creates and uploads
+ // the global jar file. If these handlers are
+ // instantiated for each container allocation
+ // we get container failures dure to modifications
+ // to already submitted global jar file
+ final ResourceLaunchHandler resourceLaunchHandler,
+ final ResourceReleaseHandler resourceReleaseHandler) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
new file mode 100644
index 0000000..a2f8f6f
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.driver.parameters.DriverRestartHandler;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This is bound to the start event of the clock and dispatches it to the approriate application code.
+ */
+public final class DriverStartHandler implements EventHandler<StartTime> {
+ private static final Logger LOG = Logger.getLogger(DriverStartHandler.class.getName());
+
+ private final Set<EventHandler<StartTime>> startHandlers;
+ private final Optional<EventHandler<StartTime>> restartHandler;
+ private final DriverStatusManager driverStatusManager;
+
+ @Inject
+ DriverStartHandler(final @Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class) Set<EventHandler<StartTime>> startHandler,
+ final @Parameter(DriverRestartHandler.class) EventHandler<StartTime> restartHandler,
+ final DriverStatusManager driverStatusManager) {
+ this.startHandlers = startHandler;
+ this.restartHandler = Optional.of(restartHandler);
+ this.driverStatusManager = driverStatusManager;
+ LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandler [{0}] and RestartHandler [{1}]",
+ new String[]{this.startHandlers.toString(), this.restartHandler.toString()});
+ }
+
+ @Inject
+ DriverStartHandler(final @Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class) Set<EventHandler<StartTime>> startHandler,
+ final DriverStatusManager driverStatusManager) {
+ this.startHandlers = startHandler;
+ this.restartHandler = Optional.empty();
+ this.driverStatusManager = driverStatusManager;
+ LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandler [{0}] and no RestartHandler",
+ this.startHandlers.toString());
+ }
+
+ @Override
+ public void onNext(final StartTime startTime) {
+ if (isRestart()) {
+ this.onRestart(startTime);
+ } else {
+ this.onStart(startTime);
+ }
+ }
+
+ private void onRestart(final StartTime startTime) {
+ if (restartHandler.isPresent()) {
+ this.restartHandler.get().onNext(startTime);
+ } else {
+ // TODO: We might have to indicate this to YARN somehow such that it doesn't try another time.
+ throw new RuntimeException("Driver restart happened, but no ON_DRIVER_RESTART handler is bound.");
+ }
+ }
+
+ private void onStart(final StartTime startTime) {
+ for (final EventHandler<StartTime> startHandler : this.startHandlers) {
+ startHandler.onNext(startTime);
+ }
+ }
+
+ /**
+ * @return true, if the Driver is in fact being restarted.
+ */
+ private boolean isRestart() {
+ return this.driverStatusManager.getNumPreviousContainers() > 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
new file mode 100644
index 0000000..600092d
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+/**
+ * The status of the Driver.
+ */
+public enum DriverStatus {
+ PRE_INIT,
+ INIT,
+ RUNNING,
+ SHUTTING_DOWN,
+ FAILING
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
new file mode 100644
index 0000000..e5e544c
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.driver.client.ClientConnection;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.time.Clock;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages the Driver's status.
+ */
+public final class DriverStatusManager {
+ private static final Logger LOG = Logger.getLogger(DriverStatusManager.class.getName());
+ private final Clock clock;
+ private final ClientConnection clientConnection;
+ private final String jobIdentifier;
+ private final ExceptionCodec exceptionCodec;
+ private DriverStatus driverStatus = DriverStatus.PRE_INIT;
+ private Optional<Throwable> shutdownCause = Optional.empty();
+ private boolean driverTerminationHasBeenCommunicatedToClient = false;
+ private boolean restartCompleted = false;
+ private int numPreviousContainers = -1;
+ private int numRecoveredContainers = 0;
+
+
+ /**
+ * @param clock
+ * @param clientConnection
+ * @param jobIdentifier
+ * @param exceptionCodec
+ */
+ @Inject
+ DriverStatusManager(final Clock clock,
+ final ClientConnection clientConnection,
+ final @Parameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class) String jobIdentifier,
+ final ExceptionCodec exceptionCodec) {
+ LOG.entering(DriverStatusManager.class.getCanonicalName(), "<init>");
+ this.clock = clock;
+ this.clientConnection = clientConnection;
+ this.jobIdentifier = jobIdentifier;
+ this.exceptionCodec = exceptionCodec;
+ LOG.log(Level.FINE, "Instantiated 'DriverStatusManager'");
+ LOG.exiting(DriverStatusManager.class.getCanonicalName(), "<init>");
+ }
+
+ /**
+ * Check whether a state transition 'from->to' is legal.
+ *
+ * @param from
+ * @param to
+ * @return
+ */
+ private static boolean isLegalTransition(final DriverStatus from, final DriverStatus to) {
+ switch (from) {
+ case PRE_INIT:
+ switch (to) {
+ case INIT:
+ return true;
+ default:
+ return false;
+ }
+ case INIT:
+ switch (to) {
+ case RUNNING:
+ return true;
+ default:
+ return false;
+ }
+ case RUNNING:
+ switch (to) {
+ case SHUTTING_DOWN:
+ case FAILING:
+ return true;
+ default:
+ return false;
+ }
+ case FAILING:
+ case SHUTTING_DOWN:
+ return false;
+ default:
+ throw new IllegalStateException("Unknown input state: " + from);
+ }
+ }
+
+ /**
+ * Changes the driver status to INIT and sends message to the client about the transition.
+ */
+ public synchronized void onInit() {
+ LOG.entering(DriverStatusManager.class.getCanonicalName(), "onInit");
+ this.clientConnection.send(this.getInitMessage());
+ this.setStatus(DriverStatus.INIT);
+ LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onInit");
+ }
+
+ /**
+ * Changes the driver status to RUNNING and sends message to the client about the transition.
+ * If the driver is in status 'PRE_INIT', this first calls onInit();
+ */
+ public synchronized void onRunning() {
+ LOG.entering(DriverStatusManager.class.getCanonicalName(), "onRunning");
+ if (this.driverStatus.equals(DriverStatus.PRE_INIT)) {
+ this.onInit();
+ }
+ this.clientConnection.send(this.getRunningMessage());
+ this.setStatus(DriverStatus.RUNNING);
+ LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onRunning");
+ }
+
+ /**
+ * End the Driver with an exception.
+ *
+ * @param exception
+ */
+ public synchronized void onError(final Throwable exception) {
+ LOG.entering(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception});
+ if (this.isShuttingDownOrFailing()) {
+ LOG.log(Level.WARNING, "Received an exception while already in shutdown.", exception);
+ } else {
+ LOG.log(Level.WARNING, "Shutting down the Driver with an exception: ", exception);
+ this.shutdownCause = Optional.of(exception);
+ this.clock.stop();
+ this.setStatus(DriverStatus.FAILING);
+ }
+ LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception});
+ }
+
+ /**
+ * Perform a clean shutdown of the Driver.
+ */
+ public synchronized void onComplete() {
+ LOG.entering(DriverStatusManager.class.getCanonicalName(), "onComplete");
+ if (this.isShuttingDownOrFailing()) {
+ LOG.log(Level.WARNING, "Ignoring second call to onComplete()");
+ } else {
+ LOG.log(Level.INFO, "Clean shutdown of the Driver.");
+ if (LOG.isLoggable(Level.FINEST)) {
+ LOG.log(Level.FINEST, "Callstack: ", new Exception());
+ }
+ this.clock.close();
+ this.setStatus(DriverStatus.SHUTTING_DOWN);
+ }
+ LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onComplete");
+
+ }
+
+ /**
+ * Sends the final message to the Driver. This is used by DriverRuntimeStopHandler.onNext().
+ *
+ * @param exception
+ */
+ public synchronized void sendJobEndingMessageToClient(final Optional<Throwable> exception) {
+ if (this.isNotShuttingDownOrFailing()) {
+ LOG.log(Level.SEVERE, "Sending message in a state different that SHUTTING_DOWN or FAILING. This is likely a illegal call to clock.close() at play. Current state: " + this.driverStatus);
+ }
+ if (this.driverTerminationHasBeenCommunicatedToClient) {
+ LOG.log(Level.SEVERE, ".sendJobEndingMessageToClient() called twice. Ignoring the second call");
+ } else {
+ { // Log the shutdown situation
+ if (this.shutdownCause.isPresent()) {
+ LOG.log(Level.WARNING, "Sending message about an unclean driver shutdown.", this.shutdownCause.get());
+ }
+ if (exception.isPresent()) {
+ LOG.log(Level.WARNING, "There was an exception during clock.close().", exception.get());
+ }
+ if (this.shutdownCause.isPresent() && exception.isPresent()) {
+ LOG.log(Level.WARNING, "The driver is shutdown because of an exception (see above) and there was an exception during clock.close(). Only the first exception will be sent to the client");
+ }
+ }
+ if (this.shutdownCause.isPresent()) {
+ // Send the earlier exception, if there was one
+ this.clientConnection.send(getJobEndingMessage(this.shutdownCause));
+ } else {
+ // Send the exception passed, if there was one.
+ this.clientConnection.send(getJobEndingMessage(exception));
+ }
+ this.driverTerminationHasBeenCommunicatedToClient = true;
+ }
+ }
+
+ /**
+ * Indicate that the Driver restart is complete. It is meant to be called exactly once during a restart and never
+ * during the ininital launch of a Driver.
+ */
+ public synchronized void setRestartCompleted() {
+ if (!this.isDriverRestart()) {
+ throw new IllegalStateException("setRestartCompleted() called in a Driver that is not, in fact, restarted.");
+ } else if (this.restartCompleted) {
+ LOG.log(Level.WARNING, "Calling setRestartCompleted more than once.");
+ } else {
+ this.restartCompleted = true;
+ }
+ }
+
+ /**
+ * @return the number of Evaluators expected to check in from a previous run.
+ */
+ public synchronized int getNumPreviousContainers() {
+ return this.numPreviousContainers;
+ }
+
+ /**
+ * Set the number of containers to expect still active from a previous execution of the Driver in a restart situation.
+ * To be called exactly once during a driver restart.
+ *
+ * @param num
+ */
+ public synchronized void setNumPreviousContainers(final int num) {
+ if (this.numPreviousContainers >= 0) {
+ throw new IllegalStateException("Attempting to set the number of expected containers left from a previous container more than once.");
+ } else {
+ this.numPreviousContainers = num;
+ }
+ }
+
+ /**
+ * @return the number of Evaluators from a previous Driver that have checked in with the Driver in a restart situation.
+ */
+ public synchronized int getNumRecoveredContainers() {
+ return this.numRecoveredContainers;
+ }
+
+ /**
+ * Indicate that this Driver has re-established the connection with one more Evaluator of a previous run.
+ */
+ public synchronized void oneContainerRecovered() {
+ this.numRecoveredContainers += 1;
+ if (this.numRecoveredContainers > this.numPreviousContainers) {
+ throw new IllegalStateException("Reconnected to" +
+ this.numRecoveredContainers +
+ "Evaluators while only expecting " +
+ this.numPreviousContainers);
+ }
+ }
+
+ /**
+ * @return true if the Driver is a restarted driver of an earlier attempt.
+ */
+ private synchronized boolean isDriverRestart() {
+ return this.getNumPreviousContainers() > 0;
+ }
+
+ public synchronized boolean isShuttingDownOrFailing() {
+ return DriverStatus.SHUTTING_DOWN.equals(this.driverStatus)
+ || DriverStatus.FAILING.equals(this.driverStatus);
+ }
+
+ private synchronized boolean isNotShuttingDownOrFailing() {
+ return !isShuttingDownOrFailing();
+ }
+
+ /**
+ * Helper method to set the status. This also checks whether the transition from the current status to the new one is
+ * legal.
+ *
+ * @param newStatus
+ */
+ private synchronized void setStatus(final DriverStatus newStatus) {
+ if (isLegalTransition(this.driverStatus, newStatus)) {
+ this.driverStatus = newStatus;
+ } else {
+ LOG.log(Level.WARNING, "Illegal state transiton: '" + this.driverStatus + "'->'" + newStatus + "'");
+ }
+ }
+
+ /**
+ * @param exception the exception that ended the Driver, if any.
+ * @return message to be sent to the client at the end of the job.
+ */
+ private synchronized ReefServiceProtos.JobStatusProto getJobEndingMessage(final Optional<Throwable> exception) {
+ final ReefServiceProtos.JobStatusProto message;
+ if (exception.isPresent()) {
+ message = ReefServiceProtos.JobStatusProto.newBuilder()
+ .setIdentifier(this.jobIdentifier)
+ .setState(ReefServiceProtos.State.FAILED)
+ .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(exception.get())))
+ .build();
+ } else {
+ message = ReefServiceProtos.JobStatusProto.newBuilder()
+ .setIdentifier(this.jobIdentifier)
+ .setState(ReefServiceProtos.State.DONE)
+ .build();
+ }
+ return message;
+ }
+
+ /**
+ * @return The message to be sent through the ClientConnection when in state INIT.
+ */
+ private synchronized ReefServiceProtos.JobStatusProto getInitMessage() {
+ return ReefServiceProtos.JobStatusProto.newBuilder()
+ .setIdentifier(this.jobIdentifier)
+ .setState(ReefServiceProtos.State.INIT)
+ .build();
+ }
+
+ /**
+ * @return The message to be sent through the ClientConnection when in state RUNNING.
+ */
+ private synchronized ReefServiceProtos.JobStatusProto getRunningMessage() {
+ return ReefServiceProtos.JobStatusProto.newBuilder()
+ .setIdentifier(this.jobIdentifier)
+ .setState(ReefServiceProtos.State.RUNNING)
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
new file mode 100644
index 0000000..940de3c
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+import org.apache.reef.driver.catalog.ResourceCatalog;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+/**
+ * Implementation of the EvaluatorRequestor that translates the request and hands it down to the underlying RM.
+ */
+public final class EvaluatorRequestorImpl implements EvaluatorRequestor {
+
+ private static final Logger LOG = Logger.getLogger(EvaluatorRequestorImpl.class.getName());
+
+ private final ResourceCatalog resourceCatalog;
+ private final ResourceRequestHandler resourceRequestHandler;
+ private final LoggingScopeFactory loggingScopeFactory;
+
+ /**
+ * @param resourceCatalog
+ * @param resourceRequestHandler
+ */
+ @Inject
+ public EvaluatorRequestorImpl(final ResourceCatalog resourceCatalog,
+ final ResourceRequestHandler resourceRequestHandler,
+ final LoggingScopeFactory loggingScopeFactory) {
+ this.resourceCatalog = resourceCatalog;
+ this.resourceRequestHandler = resourceRequestHandler;
+ this.loggingScopeFactory = loggingScopeFactory;
+ }
+
+ @Override
+ public synchronized void submit(final EvaluatorRequest req) {
+ LOG.log(Level.FINEST, "Got an EvaluatorRequest: number: {0}, memory = {1}, cores = {2}.", new Object[]{req.getNumber(), req.getMegaBytes(), req.getNumberOfCores()});
+
+ if (req.getMegaBytes() <= 0) {
+ throw new IllegalArgumentException("Given an unsupported memory size: " + req.getMegaBytes());
+ }
+ if (req.getNumberOfCores() <= 0) {
+ throw new IllegalArgumentException("Given an unsupported core number: " + req.getNumberOfCores());
+ }
+ if (req.getNumber() <= 0) {
+ throw new IllegalArgumentException("Given an unsupported number of evaluators: " + req.getNumber());
+ }
+
+ try (LoggingScope ls = loggingScopeFactory.evaluatorSubmit(req.getNumber())) {
+ final DriverRuntimeProtocol.ResourceRequestProto.Builder request = DriverRuntimeProtocol.ResourceRequestProto
+ .newBuilder()
+ .setResourceCount(req.getNumber())
+ .setVirtualCores(req.getNumberOfCores())
+ .setMemorySize(req.getMegaBytes());
+
+ final ResourceCatalog.Descriptor descriptor = req.getDescriptor();
+ if (descriptor != null) {
+ if (descriptor instanceof RackDescriptor) {
+ request.addRackName(descriptor.getName());
+ } else if (descriptor instanceof NodeDescriptor) {
+ request.addNodeName(descriptor.getName());
+ } else {
+ throw new IllegalArgumentException("Unable to operate on descriptors of type " + descriptor.getClass().getName());
+ }
+ }
+
+ this.resourceRequestHandler.onNext(request.build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/AbstractDriverRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/AbstractDriverRuntimeConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/AbstractDriverRuntimeConfiguration.java
new file mode 100644
index 0000000..b832d22
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/AbstractDriverRuntimeConfiguration.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.api;
+
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.util.Builder;
+
+/**
+ * @deprecated Runtimes are advised to create their own ConfigurationModules instead of subclassing this class.
+ */
+@Deprecated
+public abstract class AbstractDriverRuntimeConfiguration implements Builder<Configuration> {
+
+ protected JavaConfigurationBuilder builder = Tang.Factory.getTang().newConfigurationBuilder();
+
+ protected AbstractDriverRuntimeConfiguration(
+ final Class<? extends ResourceLaunchHandler> resourceLaunchHandlerClass,
+ final Class<? extends ResourceReleaseHandler> resourceReleaseHandlerClass,
+ final Class<? extends ResourceRequestHandler> resourceRequestHandlerClass) {
+ try {
+ this.builder.bind(ResourceLaunchHandler.class, resourceLaunchHandlerClass);
+ this.builder.bind(ResourceReleaseHandler.class, resourceReleaseHandlerClass);
+ this.builder.bind(ResourceRequestHandler.class, resourceRequestHandlerClass);
+
+ } catch (final BindException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public final Configuration build() {
+ return this.builder.build();
+ }
+
+ public final AbstractDriverRuntimeConfiguration addClientConfiguration(final Configuration conf) {
+ try {
+ this.builder.addConfiguration(conf);
+ return this;
+ } catch (final BindException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public final AbstractDriverRuntimeConfiguration setJobIdentifier(final String id) {
+ try {
+ this.builder.bindNamedParameter(JobIdentifier.class, id.toString());
+ return this;
+ } catch (final BindException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public final AbstractDriverRuntimeConfiguration setClientRemoteIdentifier(final String rid) {
+ try {
+ this.builder.bindNamedParameter(ClientRemoteIdentifier.class, rid.toString());
+ return this;
+ } catch (final BindException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public final AbstractDriverRuntimeConfiguration setDriverProcessMemory(final int memory) {
+ try {
+ this.builder.bindNamedParameter(DriverProcessMemory.class, Integer.toString(memory));
+ return this;
+ } catch (final BindException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public final AbstractDriverRuntimeConfiguration setEvaluatorTimeout(final long value) {
+ try {
+ this.builder.bindNamedParameter(EvaluatorTimeout.class, Long.toString(value));
+ return this;
+ } catch (final BindException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @NamedParameter(doc = "The job identifier.")
+ public final static class JobIdentifier implements Name<String> {
+ }
+
+ @NamedParameter(doc = "The client remote identifier.", default_value = ClientRemoteIdentifier.NONE)
+ public final static class ClientRemoteIdentifier implements Name<String> {
+ /**
+ * Indicates that there is no Client.
+ */
+ public static final String NONE = ErrorHandlerRID.NONE;
+ }
+
+ @NamedParameter(doc = "The evaluator timeout (how long to wait before deciding an evaluator is dead.", default_value = "60000")
+ public final static class EvaluatorTimeout implements Name<Long> {
+ }
+
+ /**
+ * This parameter denotes that the driver process should actually be
+ * started in a separate process with the given amount of JVM memory.
+ */
+ @NamedParameter(doc = "The driver process memory.", default_value = "512")
+ public final static class DriverProcessMemory implements Name<Integer> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/DefaultResourceManagerLifeCycle.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/DefaultResourceManagerLifeCycle.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/DefaultResourceManagerLifeCycle.java
new file mode 100644
index 0000000..51da6c0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/DefaultResourceManagerLifeCycle.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchHandler.java
new file mode 100644
index 0000000..65c3830
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.api;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ *
+ */
+@RuntimeAuthor
+public interface ResourceLaunchHandler extends EventHandler<DriverRuntimeProtocol.ResourceLaunchProto> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerLifeCycle.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerLifeCycle.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerLifeCycle.java
new file mode 100644
index 0000000..51da6c0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerLifeCycle.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseHandler.java
new file mode 100644
index 0000000..581eb63
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.api;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ *
+ */
+@RuntimeAuthor
+public interface ResourceReleaseHandler extends EventHandler<DriverRuntimeProtocol.ResourceReleaseProto> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestHandler.java
new file mode 100644
index 0000000..3d30ec8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.api;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * The evaluator request handler.
+ */
+@RuntimeAuthor
+public interface ResourceRequestHandler extends EventHandler<DriverRuntimeProtocol.ResourceRequestProto> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/RuntimeParameters.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/RuntimeParameters.java
new file mode 100644
index 0000000..e847249
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/RuntimeParameters.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.api;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Driver resourcemanager parameters that resourcemanager implementations use to communicate with
+ * the Driver.
+ */
+@RuntimeAuthor
+public final class RuntimeParameters {
+
+ @NamedParameter(doc = "The resource allocation handler that stub runtimes send along allocated resources e.g., containers.")
+ public final static class ResourceAllocationHandler implements Name<EventHandler<DriverRuntimeProtocol.ResourceAllocationProto>> {
+ }
+
+ @NamedParameter(doc = "The node descriptor handler that stub runtimes send along node information.")
+ public final static class NodeDescriptorHandler implements Name<EventHandler<DriverRuntimeProtocol.NodeDescriptorProto>> {
+ }
+
+ @NamedParameter(doc = "The resource status handler.")
+ public final static class ResourceStatusHandler implements Name<EventHandler<DriverRuntimeProtocol.ResourceStatusProto>> {
+ }
+
+ @NamedParameter(doc = "The resourcemanager status handler.")
+ public final static class RuntimeStatusHandler implements Name<EventHandler<DriverRuntimeProtocol.RuntimeStatusProto>> {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/package-info.java
new file mode 100644
index 0000000..50f52f6
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Driver-Side Event Handlers to be implemented by a specific resource manager.
+ */
+package org.apache.reef.runtime.common.driver.api;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/NodeDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/NodeDescriptorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/NodeDescriptorImpl.java
new file mode 100644
index 0000000..3cdf0d3
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/NodeDescriptorImpl.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.catalog;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+
+import java.net.InetSocketAddress;
+
+@Private
+public class NodeDescriptorImpl implements NodeDescriptor {
+
+ private final RackDescriptorImpl rack;
+
+ private final String id;
+
+ private final InetSocketAddress address;
+
+ private final int ram;
+
+ /**
+ * @param id
+ * @param address
+ * @param rack
+ * @param ram the RAM available to the machine, in MegaBytes.
+ */
+ NodeDescriptorImpl(final String id, final InetSocketAddress address, final RackDescriptorImpl rack, final int ram) {
+ this.id = id;
+ this.address = address;
+ this.rack = rack;
+ this.ram = ram;
+ this.rack.addNodeDescriptor(this);
+ }
+
+ @Override
+ public String toString() {
+ return "Node [" + this.address + "]: RACK " + this.rack.getName() + ", RAM " + ram;
+ }
+
+ @Override
+ public final String getId() {
+ return this.id;
+ }
+
+
+ @Override
+ public InetSocketAddress getInetSocketAddress() {
+ return this.address;
+ }
+
+ @Override
+ public RackDescriptor getRackDescriptor() {
+ return this.rack;
+ }
+
+ @Override
+ public String getName() {
+ return this.address.getHostName();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/RackDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/RackDescriptorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/RackDescriptorImpl.java
new file mode 100644
index 0000000..4256ee5
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/RackDescriptorImpl.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.catalog;
+
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public final class RackDescriptorImpl implements RackDescriptor {
+
+ private final String name;
+
+
+ private final List<NodeDescriptorImpl> nodes;
+
+ RackDescriptorImpl(final String name) {
+ this.name = name;
+ this.nodes = new ArrayList<>();
+ }
+
+ public final String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Rack " + this.name);
+ for (final NodeDescriptorImpl node : nodes) {
+ sb.append("\n\t" + node);
+ }
+ return sb.toString();
+ }
+
+ public final int hashCode() {
+ return this.name.hashCode();
+ }
+
+ public final boolean equals(final Object obj) {
+ if (obj instanceof RackDescriptorImpl) {
+ return obj.toString().equals(this.name);
+ } else {
+ return false;
+ }
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public List<NodeDescriptor> getNodes() {
+ return Collections.unmodifiableList(new ArrayList<NodeDescriptor>(this.nodes));
+ }
+
+ /**
+ * Should only be used from YarnNodeDescriptor constructor.
+ *
+ * @param node to add.
+ */
+ void addNodeDescriptor(final NodeDescriptorImpl node) {
+ this.nodes.add(node);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java
new file mode 100644
index 0000000..6193af4
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.catalog;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+import org.apache.reef.driver.catalog.ResourceCatalog;
+import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto;
+
+import javax.inject.Inject;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Private
+public final class ResourceCatalogImpl implements ResourceCatalog {
+
+ public final static String DEFAULT_RACK = "/default-rack";
+ private static final Logger LOG = Logger.getLogger(ResourceCatalog.class.getName());
+ private final Map<String, RackDescriptorImpl> racks = new HashMap<>();
+
+ private final Map<String, NodeDescriptorImpl> nodes = new HashMap<>();
+
+ @Inject
+ ResourceCatalogImpl() {
+ LOG.log(Level.FINE, "Instantiated 'ResourceCatalogImpl'");
+ }
+
+ @Override
+ public synchronized String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("=== Resource Catalog ===");
+ for (final RackDescriptor rack : racks.values()) {
+ sb.append("\n" + rack);
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public synchronized Collection<NodeDescriptor> getNodes() {
+ return Collections.unmodifiableCollection(new ArrayList<NodeDescriptor>(this.nodes.values()));
+ }
+
+ @Override
+ public synchronized Collection<RackDescriptor> getRacks() {
+ return Collections.unmodifiableCollection(new ArrayList<RackDescriptor>(this.racks.values()));
+ }
+
+ public synchronized final NodeDescriptor getNode(final String id) {
+ return this.nodes.get(id);
+ }
+
+ public synchronized final void handle(final NodeDescriptorProto node) {
+ final String rack_name = (node.hasRackName() ? node.getRackName() : DEFAULT_RACK);
+
+ LOG.log(Level.FINEST, "Catalog new node: id[{0}], rack[{1}], host[{2}], port[{3}], memory[{4}]",
+ new Object[]{node.getIdentifier(), rack_name, node.getHostName(), node.getPort(),
+ node.getMemorySize()}
+ );
+
+ if (!this.racks.containsKey(rack_name)) {
+ final RackDescriptorImpl rack = new RackDescriptorImpl(rack_name);
+ this.racks.put(rack_name, rack);
+ }
+ final RackDescriptorImpl rack = this.racks.get(rack_name);
+ final InetSocketAddress address = new InetSocketAddress(node.getHostName(), node.getPort());
+ final NodeDescriptorImpl nodeDescriptor = new NodeDescriptorImpl(node.getIdentifier(), address, rack, node.getMemorySize());
+ this.nodes.put(nodeDescriptor.getId(), nodeDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
new file mode 100644
index 0000000..f7640b9
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Represents the communication channel to the client.
+ */
+@DriverSide
+public final class ClientConnection {
+
+ private static final Logger LOG = Logger.getLogger(ClientConnection.class.getName());
+
+ private final EventHandler<ReefServiceProtos.JobStatusProto> jobStatusHandler;
+ private final String jobIdentifier;
+
+ @Inject
+ public ClientConnection(
+ final RemoteManager remoteManager,
+ final @Parameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class) String clientRID,
+ final @Parameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class) String jobIdentifier) {
+ this.jobIdentifier = jobIdentifier;
+ if (clientRID.equals(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE)) {
+ LOG.log(Level.FINE, "Instantiated 'ClientConnection' without an actual connection to the client.");
+ this.jobStatusHandler = new LoggingJobStatusHandler();
+ } else {
+ this.jobStatusHandler = remoteManager.getHandler(clientRID, ReefServiceProtos.JobStatusProto.class);
+ LOG.log(Level.FINE, "Instantiated 'ClientConnection'");
+ }
+ }
+
+ /**
+ * Send the given JobStatus to the client.
+ *
+ * @param status
+ */
+ public synchronized void send(final ReefServiceProtos.JobStatusProto status) {
+ LOG.log(Level.FINEST, "Sending:\n" + status);
+ this.jobStatusHandler.onNext(status);
+ }
+
+ /**
+ * Send the given byte[] as a message to the client.
+ *
+ * @param message
+ */
+ public synchronized void sendMessage(final byte[] message) {
+ this.send(ReefServiceProtos.JobStatusProto.newBuilder()
+ .setIdentifier(this.jobIdentifier)
+ .setState(ReefServiceProtos.State.RUNNING)
+ .setMessage(ByteString.copyFrom(message))
+ .build());
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java
new file mode 100644
index 0000000..2a686af
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.parameters.ClientCloseHandlers;
+import org.apache.reef.driver.parameters.ClientCloseWithMessageHandlers;
+import org.apache.reef.driver.parameters.ClientMessageHandlers;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.utils.BroadCastEventHandler;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Represents the Client in the Driver.
+ */
+@Private
+@DriverSide
+public final class ClientManager implements EventHandler<ClientRuntimeProtocol.JobControlProto> {
+
+ private final static Logger LOG = Logger.getLogger(ClientManager.class.getName());
+
+
+ private final InjectionFuture<Set<EventHandler<Void>>> clientCloseHandlers;
+
+ private final InjectionFuture<Set<EventHandler<byte[]>>> clientCloseWithMessageHandlers;
+
+ private final InjectionFuture<Set<EventHandler<byte[]>>> clientMessageHandlers;
+
+ private final DriverStatusManager driverStatusManager;
+
+ private volatile EventHandler<Void> clientCloseDispatcher;
+
+ private volatile EventHandler<byte[]> clientCloseWithMessageDispatcher;
+
+ private volatile EventHandler<byte[]> clientMessageDispatcher;
+
+
+ @Inject
+ ClientManager(final @Parameter(ClientCloseHandlers.class) InjectionFuture<Set<EventHandler<Void>>> clientCloseHandlers,
+ final @Parameter(ClientCloseWithMessageHandlers.class) InjectionFuture<Set<EventHandler<byte[]>>> clientCloseWithMessageHandlers,
+ final @Parameter(ClientMessageHandlers.class) InjectionFuture<Set<EventHandler<byte[]>>> clientMessageHandlers,
+ final @Parameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class) String clientRID,
+ final RemoteManager remoteManager,
+ final DriverStatusManager driverStatusManager) {
+ this.driverStatusManager = driverStatusManager;
+ this.clientCloseHandlers = clientCloseHandlers;
+ this.clientCloseWithMessageHandlers = clientCloseWithMessageHandlers;
+ this.clientMessageHandlers = clientMessageHandlers;
+
+ if (!clientRID.equals(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE)) {
+ remoteManager.registerHandler(clientRID, ClientRuntimeProtocol.JobControlProto.class, this);
+ } else {
+ LOG.log(Level.FINE, "Not registering a handler for JobControlProto, as there is no client.");
+ }
+ }
+
+ /**
+ * This method reacts to control messages passed by the client to the driver. It will forward
+ * messages related to the ClientObserver interface to the Driver. It will also initiate a shutdown
+ * if the client indicates a close message.
+ *
+ * @param jobControlProto contains the client initiated control message
+ */
+ @Override
+ public synchronized void onNext(final ClientRuntimeProtocol.JobControlProto jobControlProto) {
+ if (jobControlProto.hasSignal()) {
+ if (jobControlProto.getSignal() == ClientRuntimeProtocol.Signal.SIG_TERMINATE) {
+ try {
+ if (jobControlProto.hasMessage()) {
+ getClientCloseWithMessageDispatcher().onNext(jobControlProto.getMessage().toByteArray());
+ } else {
+ getClientCloseDispatcher().onNext(null);
+ }
+ } finally {
+ this.driverStatusManager.onComplete();
+ }
+ } else {
+ LOG.log(Level.FINEST, "Unsupported signal: " + jobControlProto.getSignal());
+ }
+ } else if (jobControlProto.hasMessage()) {
+ getClientMessageDispatcher().onNext(jobControlProto.getMessage().toByteArray());
+ }
+ }
+
+ private synchronized EventHandler<Void> getClientCloseDispatcher() {
+ if (clientCloseDispatcher != null) {
+ return clientCloseDispatcher;
+ } else {
+ synchronized (this) {
+ if (clientCloseDispatcher == null)
+ clientCloseDispatcher = new BroadCastEventHandler<>(clientCloseHandlers.get());
+ }
+ return clientCloseDispatcher;
+ }
+ }
+
+ private EventHandler<byte[]> getClientCloseWithMessageDispatcher() {
+ if (clientCloseWithMessageDispatcher != null) {
+ return clientCloseWithMessageDispatcher;
+ } else {
+ synchronized (this) {
+ if (clientCloseWithMessageDispatcher == null)
+ clientCloseWithMessageDispatcher = new BroadCastEventHandler<>(clientCloseWithMessageHandlers.get());
+ }
+ return clientCloseWithMessageDispatcher;
+ }
+ }
+
+ private EventHandler<byte[]> getClientMessageDispatcher() {
+ if (clientMessageDispatcher != null) {
+ return clientMessageDispatcher;
+ } else {
+ synchronized (this) {
+ if (clientMessageDispatcher == null)
+ clientMessageDispatcher = new BroadCastEventHandler<>(clientMessageHandlers.get());
+ }
+ return clientMessageDispatcher;
+ }
+ }
+
+}