You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2018/04/21 00:23:24 UTC
[2/4] reef git commit: [REEF-2002] Create Java project for gRPC two
process bridge
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java
new file mode 100644
index 0000000..e3f74c8
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.service;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.bridge.service.parameters.DriverClientCommand;
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.driver.parameters.DriverIdleSources;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.RequiredImpl;
+import org.apache.reef.tang.formats.RequiredParameter;
+
+/**
+ * Binds all driver bridge service handlers to the driver.
+ */
+@Private
+public final class DriverServiceConfiguration extends ConfigurationModuleBuilder {
+
+ public static final RequiredImpl<IDriverService> DRIVER_SERVICE_IMPL = new RequiredImpl<>();
+
+ public static final RequiredParameter<String> DRIVER_CLIENT_COMMAND = new RequiredParameter<>();
+
+ /** Configuration module that binds all driver handlers. */
+ public static final ConfigurationModule CONF = new DriverServiceConfiguration()
+ .merge(DriverConfiguration.CONF)
+ .bindImplementation(IDriverService.class, DRIVER_SERVICE_IMPL)
+ .bindNamedParameter(DriverClientCommand.class, DRIVER_CLIENT_COMMAND)
+ .bindSetEntry(DriverIdleSources.class, IDriverService.class)
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java
new file mode 100644
index 0000000..cca2436
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.service;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Contains Java side event handlers that perform
+ * hand-off with the driver client side.
+ */
+@Unit
+public final class DriverServiceHandlers {
+
+ private static final Logger LOG = Logger.getLogger(DriverServiceHandlers.class.getName());
+
+ private final IDriverService driverBridgeService;
+
+ @Inject
+ private DriverServiceHandlers(
+ final IDriverService driverBridgeService) {
+ this.driverBridgeService = driverBridgeService;
+ }
+
+ /**
+ * Job Driver is ready and the clock is set up: request the evaluators.
+ */
+ final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ LOG.log(Level.INFO, "JavaBridge: Start Driver");
+ DriverServiceHandlers.this.driverBridgeService.startHandler(startTime);
+ }
+ }
+
+ /**
+ * Job Driver is is shutting down: write to the log.
+ */
+ final class StopHandler implements EventHandler<StopTime> {
+ @Override
+ public void onNext(final StopTime stopTime) {
+ LOG.log(Level.INFO, "JavaBridge: Stop Driver");
+ DriverServiceHandlers.this.driverBridgeService.stopHandler(stopTime);
+ }
+ }
+
+ /**
+ * Receive notification that an Evaluator had been allocated,
+ * and submitTask a new Task in that Evaluator.
+ */
+ final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator eval) {
+ LOG.log(Level.INFO, "JavaBridge: Allocated Evaluator {0}", eval.getId());
+ DriverServiceHandlers.this.driverBridgeService.allocatedEvaluatorHandler(eval);
+ }
+ }
+
+ final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
+ @Override
+ public void onNext(final CompletedEvaluator eval) {
+ LOG.log(Level.INFO, "JavaBridge: Completed Evaluator {0}", eval.getId());
+ DriverServiceHandlers.this.driverBridgeService.completedEvaluatorHandler(eval);
+ }
+ }
+
+ final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
+ @Override
+ public void onNext(final FailedEvaluator eval) {
+ LOG.log(Level.INFO, "JavaBridge: Failed Evaluator {0}", eval.getId());
+ DriverServiceHandlers.this.driverBridgeService.failedEvaluatorHandler(eval);
+ }
+ }
+
+ /**
+ * Receive notification that the Context is active.
+ */
+ final class ActiveContextHandler implements EventHandler<ActiveContext> {
+ @Override
+ public void onNext(final ActiveContext context) {
+ LOG.log(Level.INFO, "JavaBridge: Active Context {0}", context.getId());
+ DriverServiceHandlers.this.driverBridgeService.activeContextHandler(context);
+ }
+ }
+
+ /**
+ * Received notification that the Context is closed.
+ */
+ final class ClosedContextHandler implements EventHandler<ClosedContext> {
+ @Override
+ public void onNext(final ClosedContext context) {
+ LOG.log(Level.INFO, "JavaBridge: Closed Context {0}", context.getId());
+ DriverServiceHandlers.this.driverBridgeService.closedContextHandler(context);
+ }
+ }
+
+ /**
+ * Received a message from the context.
+ */
+ final class ContextMessageHandler implements EventHandler<ContextMessage> {
+ @Override
+ public void onNext(final ContextMessage message) {
+ LOG.log(Level.INFO, "JavaBridge: Context Message id {0}", message.getId());
+ DriverServiceHandlers.this.driverBridgeService.contextMessageHandler(message);
+ }
+ }
+
+ /**
+ * Received notification that the Context failed.
+ */
+ final class ContextFailedHandler implements EventHandler<FailedContext> {
+ @Override
+ public void onNext(final FailedContext context) {
+ LOG.log(Level.INFO, "JavaBridge: Context Failed {0}", context.getId());
+ DriverServiceHandlers.this.driverBridgeService.failedContextHandler(context);
+ }
+ }
+
+ /**
+ * Receive notification that the Task is running.
+ */
+ final class RunningTaskHandler implements EventHandler<RunningTask> {
+ @Override
+ public void onNext(final RunningTask task) {
+ LOG.log(Level.INFO, "JavaBridge: Running Task {0}", task.getId());
+ DriverServiceHandlers.this.driverBridgeService.runningTaskHandler(task);
+ }
+ }
+
+ /**
+ * Received notification that the Task failed.
+ */
+ final class FailedTaskHandler implements EventHandler<FailedTask> {
+ @Override
+ public void onNext(final FailedTask task) {
+ LOG.log(Level.INFO, "JavaBridge: Failed Task {0}", task.getId());
+ DriverServiceHandlers.this.driverBridgeService.failedTaskHandler(task);
+ }
+ }
+
+ /**
+ * Receive notification that the Task has completed successfully.
+ */
+ final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+ @Override
+ public void onNext(final CompletedTask task) {
+ LOG.log(Level.INFO, "JavaBridge: Completed Task {0}", task.getId());
+ DriverServiceHandlers.this.driverBridgeService.completedTaskHandler(task);
+ }
+ }
+
+ /**
+ * Received notification that the Task was suspended.
+ */
+ final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
+ @Override
+ public void onNext(final SuspendedTask task) {
+ LOG.log(Level.INFO, "JavaBridge: Suspended Task {0}", task.getId());
+ DriverServiceHandlers.this.driverBridgeService.suspendedTaskHandler(task);
+ }
+ }
+
+ /**
+ * Received a message from the task.
+ */
+ final class TaskMessageHandler implements EventHandler<TaskMessage> {
+ @Override
+ public void onNext(final TaskMessage message) {
+ LOG.log(Level.INFO, "JavaBridge: Message from Task {0}", message.getId());
+ DriverServiceHandlers.this.driverBridgeService.taskMessageHandler(message);
+ }
+ }
+
+ /**
+ * Received a message from the client.
+ */
+ final class ClientMessageHandler implements EventHandler<byte[]> {
+ @Override
+ public void onNext(final byte[] message) {
+ LOG.log(Level.INFO, "JavaBridge: Message from Client");
+ DriverServiceHandlers.this.driverBridgeService.clientMessageHandler(message);
+ }
+ }
+
+ /**
+ * Received a close event from the client.
+ */
+ final class ClientCloseHandler implements EventHandler<Void> {
+ @Override
+ public void onNext(final Void value) {
+ LOG.log(Level.INFO, "JavaBridge: Close event from Client");
+ DriverServiceHandlers.this.driverBridgeService.clientCloseHandler();
+ }
+ }
+
+ /**
+ * Received a close event with message.
+ */
+ final class ClientCloseWithMessageHandler implements EventHandler<byte[]> {
+ @Override
+ public void onNext(final byte[] message) {
+ LOG.log(Level.INFO, "JavaBridge: Close event with messages from Client");
+ DriverServiceHandlers.this.driverBridgeService.clientCloseWithMessageHandler(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java
new file mode 100644
index 0000000..fbafff9
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.service;
+
+import com.google.protobuf.util.JsonFormat;
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.bridge.client.JavaDriverClientLauncher;
+import org.apache.reef.bridge.examples.WindowsRuntimePathProvider;
+import org.apache.reef.bridge.proto.ClientProtocol;
+import org.apache.reef.bridge.service.grpc.GRPCDriverService;
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.io.TcpPortConfigurationProvider;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.files.RuntimePathProvider;
+import org.apache.reef.runtime.common.files.UnixJVMPathProvider;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.*;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.util.OSUtils;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver Service Launcher - main class.
+ */
+public final class DriverServiceLauncher {
+
+ /**
+ * Standard Java logger.
+ */
+ private static final Logger LOG = Logger.getLogger(DriverServiceLauncher.class.getName());
+
+ /**
+ * This class should not be instantiated.
+ */
+ private DriverServiceLauncher() {
+ throw new RuntimeException("Do not instantiate this class!");
+ }
+
+ /**
+ * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
+ *
+ * @param driverClientConfigurationProto containing which runtime to configure: local, yarn, azbatch
+ * @return (immutable) TANG Configuration object.
+ * @throws BindException if configuration commandLineInjector fails.
+ * @throws InjectionException if configuration commandLineInjector fails.
+ */
+ private static Configuration getRuntimeConfiguration(
+ final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto)
+ throws BindException {
+ switch (driverClientConfigurationProto.getRuntimeCase()) {
+ case LOCAL_RUNTIME:
+ return getLocalRuntimeConfiguration(driverClientConfigurationProto);
+ case YARN_RUNTIME:
+ return getYarnRuntimeConfiguration(driverClientConfigurationProto);
+ default:
+ throw new IllegalArgumentException("Unsupported runtime " + driverClientConfigurationProto.getRuntimeCase());
+ }
+ }
+
+ private static Configuration getLocalRuntimeConfiguration(
+ final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto)
+ throws BindException {
+ LOG.log(Level.FINE, "JavaBridge: Running on the local runtime");
+ return LocalRuntimeConfiguration.CONF
+ .build();
+ }
+
+ private static Configuration getYarnRuntimeConfiguration(
+ final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto)
+ throws BindException {
+ LOG.log(Level.FINE, "JavaBridge: Running on YARN");
+ return YarnClientConfiguration.CONF.build();
+ }
+
+ private static Configuration getDriverServiceConfiguration(
+ final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) {
+ // Set required parameters
+ ConfigurationModule driverServiceConfigurationModule = DriverServiceConfiguration.CONF
+ .set(DriverServiceConfiguration.DRIVER_SERVICE_IMPL, GRPCDriverService.class)
+ .set(DriverServiceConfiguration.DRIVER_CLIENT_COMMAND,
+ driverClientConfigurationProto.getDriverClientLaunchCommand())
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, driverClientConfigurationProto.getJobid());
+
+ // Set file dependencies
+ final List<String> localLibraries = new ArrayList<>();
+ localLibraries.add(EnvironmentUtils.getClassLocation(GRPCDriverService.class));
+ if (driverClientConfigurationProto.getLocalLibrariesCount() > 0) {
+ localLibraries.addAll(driverClientConfigurationProto.getLocalLibrariesList());
+ }
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .setMultiple(DriverConfiguration.LOCAL_LIBRARIES, localLibraries);
+ if (driverClientConfigurationProto.getGlobalLibrariesCount() > 0) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES,
+ driverClientConfigurationProto.getGlobalLibrariesList());
+ }
+ if (driverClientConfigurationProto.getLocalFilesCount() > 0) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .setMultiple(DriverConfiguration.LOCAL_FILES,
+ driverClientConfigurationProto.getLocalFilesList());
+ }
+ if (driverClientConfigurationProto.getGlobalFilesCount() > 0) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .setMultiple(DriverConfiguration.GLOBAL_FILES,
+ driverClientConfigurationProto.getGlobalFilesList());
+ }
+ // Setup driver resources
+ if (driverClientConfigurationProto.getCpuCores() > 0) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.DRIVER_CPU_CORES, driverClientConfigurationProto.getCpuCores());
+ }
+ if (driverClientConfigurationProto.getMemoryMb() > 0) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.DRIVER_MEMORY, driverClientConfigurationProto.getMemoryMb());
+ }
+
+ // Setup handlers
+ final Set<ClientProtocol.DriverClientConfiguration.Handlers> handlerLabelSet = new HashSet<>();
+ handlerLabelSet.addAll(driverClientConfigurationProto.getHandlerList());
+ if (!handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.START.START)) {
+ throw new IllegalArgumentException("Start handler required");
+ } else {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_DRIVER_STARTED, DriverServiceHandlers.StartHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_STOP, DriverServiceHandlers.StopHandler.class);
+ }
+ if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_ALLOCATED)) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DriverServiceHandlers.AllocatedEvaluatorHandler.class);
+ }
+ if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_COMPLETED)) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, DriverServiceHandlers.CompletedEvaluatorHandler.class);
+ }
+ if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_FAILED)) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_EVALUATOR_FAILED, DriverServiceHandlers.FailedEvaluatorHandler.class);
+ }
+ if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_ACTIVE)) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_CONTEXT_ACTIVE, DriverServiceHandlers.ActiveContextHandler.class);
+ }
+ if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_CLOSED)) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_CONTEXT_CLOSED, DriverServiceHandlers.ClosedContextHandler.class);
+ }
+ if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_FAILED)) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_CONTEXT_FAILED, DriverServiceHandlers.ContextFailedHandler.class);
+ }
+ if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_MESSAGE)) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_CONTEXT_MESSAGE, DriverServiceHandlers.ContextMessageHandler.class);
+ }
+ if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_RUNNING)) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_TASK_RUNNING, DriverServiceHandlers.RunningTaskHandler.class);
+ }
+ if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_COMPLETED)) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_TASK_COMPLETED, DriverServiceHandlers.CompletedTaskHandler.class);
+ }
+ if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_FAILED)) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_TASK_FAILED, DriverServiceHandlers.FailedTaskHandler.class);
+ }
+ if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_MESSAGE)) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_TASK_MESSAGE, DriverServiceHandlers.TaskMessageHandler.class);
+ }
+ if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_MESSAGE)) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverServiceHandlers.ClientMessageHandler.class);
+ }
+ if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_CLOSE)) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_CLIENT_CLOSED, DriverServiceHandlers.ClientCloseHandler.class);
+ }
+ if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_CLOSE_WITH_MESSAGE)) {
+ driverServiceConfigurationModule = driverServiceConfigurationModule
+ .set(DriverConfiguration.ON_CLIENT_CLOSED_MESSAGE, DriverServiceHandlers.ClientCloseWithMessageHandler.class);
+ }
+
+ return setTcpPortRange(driverClientConfigurationProto, driverServiceConfigurationModule.build());
+ }
+
+ private static Configuration setTcpPortRange(
+ final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto,
+ final Configuration driverServiceConfiguration) {
+ JavaConfigurationBuilder configurationModuleBuilder =
+ Tang.Factory.getTang().newConfigurationBuilder(driverServiceConfiguration)
+ .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class);
+ // Setup TCP constraints
+ if (driverClientConfigurationProto.getTcpPortRangeBegin() > 0) {
+ configurationModuleBuilder = configurationModuleBuilder
+ .bindNamedParameter(TcpPortRangeBegin.class,
+ Integer.toString(driverClientConfigurationProto.getTcpPortRangeBegin()));
+ }
+ if (driverClientConfigurationProto.getTcpPortRangeCount() > 0) {
+ configurationModuleBuilder = configurationModuleBuilder
+ .bindNamedParameter(TcpPortRangeCount.class,
+ Integer.toString(driverClientConfigurationProto.getTcpPortRangeCount()));
+ }
+ if (driverClientConfigurationProto.getTcpPortRangeTryCount() > 0) {
+ configurationModuleBuilder = configurationModuleBuilder
+ .bindNamedParameter(TcpPortRangeCount.class,
+ Integer.toString(driverClientConfigurationProto.getTcpPortRangeTryCount()));
+ }
+ return configurationModuleBuilder.build();
+ }
+
+ public static LauncherStatus submit(
+ final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto,
+ final Configuration driverClientConfiguration)
+ throws InjectionException, IOException {
+ ClientProtocol.DriverClientConfiguration.Builder builder =
+ ClientProtocol.DriverClientConfiguration.newBuilder(driverClientConfigurationProto);
+ final File driverClientConfigurationFile = new File("driverclient.conf");
+ try {
+ // Write driver client configuration to a file
+ final Injector driverClientInjector = Tang.Factory.getTang().newInjector(driverClientConfiguration);
+ final ConfigurationSerializer configurationSerializer =
+ driverClientInjector.getInstance(ConfigurationSerializer.class);
+ configurationSerializer.toFile(driverClientConfiguration, driverClientConfigurationFile);
+
+ // Get runtime injector and piece together the launch command based on its classpath info
+ final Configuration runtimeConfiguration = getRuntimeConfiguration(driverClientConfigurationProto);
+ // Resolve OS Runtime Path Provider
+ final Configuration runtimeOSConfiguration = Configurations.merge(
+ Tang.Factory.getTang().newConfigurationBuilder()
+ .bind(RuntimePathProvider.class,
+ OSUtils.isWindows() ? WindowsRuntimePathProvider.class : UnixJVMPathProvider.class)
+ .build(),
+ runtimeConfiguration);
+ final Injector runtimeInjector = Tang.Factory.getTang().newInjector(runtimeOSConfiguration);
+ final REEFFileNames fileNames = runtimeInjector.getInstance(REEFFileNames.class);
+ final ClasspathProvider classpathProvider = runtimeInjector.getInstance(ClasspathProvider.class);
+ final RuntimePathProvider runtimePathProvider = runtimeInjector.getInstance(RuntimePathProvider.class);
+ final List<String> launchCommand = new JavaLaunchCommandBuilder(JavaDriverClientLauncher.class, null)
+ .setConfigurationFilePaths(
+ Collections.singletonList("./" + fileNames.getLocalFolderPath() + "/" +
+ driverClientConfigurationFile.getName()))
+ .setJavaPath(runtimePathProvider.getPath())
+ .setClassPath(classpathProvider.getEvaluatorClasspath())
+ .build();
+ final String cmd = StringUtils.join(launchCommand, ' ');
+ builder.setDriverClientLaunchCommand(cmd);
+ builder.addLocalFiles(driverClientConfigurationFile.getAbsolutePath());
+
+
+
+ // Configure driver service and launch the job
+ final Configuration driverServiceConfiguration = getDriverServiceConfiguration(builder.build());
+ return DriverLauncher.getLauncher(runtimeOSConfiguration).run(driverServiceConfiguration);
+ } finally {
+ driverClientConfigurationFile.delete();
+ }
+ }
+
+ /**
+ * Main method that launches the REEF job.
+ *
+ * @param args command line parameters.
+ */
+ public static void main(final String[] args) {
+ try {
+ if (args.length != 1) {
+ LOG.log(Level.SEVERE, DriverServiceLauncher.class.getName() +
+ " accepts single argument referencing a file that contains a client protocol buffer driver configuration");
+ }
+ final String content;
+ try {
+ content = new String(Files.readAllBytes(Paths.get(args[0])));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ final ClientProtocol.DriverClientConfiguration.Builder driverClientConfigurationProtoBuilder =
+ ClientProtocol.DriverClientConfiguration.newBuilder();
+ JsonFormat.parser()
+ .usingTypeRegistry(JsonFormat.TypeRegistry.getEmptyTypeRegistry())
+ .merge(content, driverClientConfigurationProtoBuilder);
+ final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto =
+ driverClientConfigurationProtoBuilder.build();
+
+ final Configuration runtimeConfig = getRuntimeConfiguration(driverClientConfigurationProto);
+ final Configuration driverConfig = getDriverServiceConfiguration(driverClientConfigurationProto);
+ DriverLauncher.getLauncher(runtimeConfig).run(driverConfig);
+ LOG.log(Level.INFO, "JavaBridge: Stop Client {0}", driverClientConfigurationProto.getJobid());
+ } catch (final BindException | InjectionException | IOException ex) {
+ LOG.log(Level.SEVERE, "Job configuration error", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/IDriverService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/IDriverService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/IDriverService.java
new file mode 100644
index 0000000..612f00d
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/IDriverService.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.service;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+/**
+ * Interface implemented by a Driver Service.
+ */
+public interface IDriverService extends DriverIdlenessSource {
+
+ /**
+ * Handle start time event.
+ * @param startTime event
+ */
+ void startHandler(final StartTime startTime);
+
+ /**
+ * Handle stop event.
+ * @param stopTime event
+ */
+ void stopHandler(final StopTime stopTime);
+
+ /**
+ * Handle allocated evaluator event.
+ * @param eval allocated
+ */
+ void allocatedEvaluatorHandler(final AllocatedEvaluator eval);
+
+ /**
+ * Handle completed evaluator event.
+ * @param eval that completed
+ */
+ void completedEvaluatorHandler(final CompletedEvaluator eval);
+
+ /**
+ * Handle failed evaluator event.
+ * @param eval that failed
+ */
+ void failedEvaluatorHandler(final FailedEvaluator eval);
+
+ /**
+ * Handle active context.
+ * @param context activated
+ */
+ void activeContextHandler(final ActiveContext context);
+
+ /**
+ * Handle closed context event.
+ * @param context that closed
+ */
+ void closedContextHandler(final ClosedContext context);
+
+ /**
+ * Handle context message event.
+ * @param message sent by context
+ */
+ void contextMessageHandler(final ContextMessage message);
+
+ /**
+ * Handled failed context event.
+ * @param context that failed
+ */
+ void failedContextHandler(final FailedContext context);
+
+ /**
+ * Handle running task event.
+ * @param task that is now running
+ */
+ void runningTaskHandler(final RunningTask task);
+
+ /**
+ * Handle failed task event.
+ * @param task that failed
+ */
+ void failedTaskHandler(final FailedTask task);
+
+ /**
+ * Handle completed task event.
+ * @param task that completed
+ */
+ void completedTaskHandler(final CompletedTask task);
+
+ /**
+ * Handle suspended task event.
+ * @param task that is suspended
+ */
+ void suspendedTaskHandler(final SuspendedTask task);
+
+ /**
+ * Handle task message event.
+ * @param message sent by task
+ */
+ void taskMessageHandler(final TaskMessage message);
+
+ /**
+ * Handle client message event.
+ * @param message sent by client
+ */
+ void clientMessageHandler(final byte[] message);
+
+ /**
+ * Handle client close event.
+ */
+ void clientCloseHandler();
+
+ /**
+ * Handle client close event with message.
+ * @param message sent by client
+ */
+ void clientCloseWithMessageHandler(final byte[] message);
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/RuntimeNames.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/RuntimeNames.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/RuntimeNames.java
new file mode 100644
index 0000000..7c7de47
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/RuntimeNames.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.service;
+
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Runtime names supported by the bridge.
+ */
+@Private
+public final class RuntimeNames {
+ public static final String LOCAL = "local";
+
+ public static final String YARN = "yarn";
+
+ public static final String AZBATCH = "azbatch";
+
+ private RuntimeNames() {}
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/GRPCDriverService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/GRPCDriverService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/GRPCDriverService.java
new file mode 100644
index 0000000..3f7a131
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/GRPCDriverService.java
@@ -0,0 +1,706 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.service.grpc;
+
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+import org.apache.reef.bridge.service.DriverClientException;
+import org.apache.reef.bridge.service.IDriverService;
+import org.apache.reef.bridge.service.parameters.DriverClientCommand;
+import org.apache.reef.bridge.proto.*;
+import org.apache.reef.bridge.proto.Void;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.runtime.common.driver.context.EvaluatorContext;
+import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorImpl;
+import org.apache.reef.runtime.common.driver.idle.IdleMessage;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.OSUtils;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * GRPC DriverBridgeService that interacts with higher-level languages.
+ */
+public final class GRPCDriverService implements IDriverService {
+ private static final Logger LOG = Logger.getLogger(GRPCDriverService.class.getName());
+
+ private Server server;
+
+ private Process driverProcess;
+
+ private DriverClientGrpc.DriverClientBlockingStub clientStub;
+
+ private final Clock clock;
+
+ private final EvaluatorRequestor evaluatorRequestor;
+
+ private final JVMProcessFactory jvmProcessFactory;
+
+ private final CLRProcessFactory clrProcessFactory;
+
+ private final TcpPortProvider tcpPortProvider;
+
+ private final String driverClientCommand;
+
+ private final Map<String, AllocatedEvaluator> allocatedEvaluatorMap = new HashMap<>();
+
+ private final Map<String, ActiveContext> activeContextMap = new HashMap<>();
+
+ private final Map<String, RunningTask> runningTaskMap = new HashMap<>();
+
+ private boolean stopped = false;
+
+ @Inject
+ private GRPCDriverService(
+ final Clock clock,
+ final EvaluatorRequestor evaluatorRequestor,
+ final JVMProcessFactory jvmProcessFactory,
+ final CLRProcessFactory clrProcessFactory,
+ final TcpPortProvider tcpPortProvider,
+ @Parameter(DriverClientCommand.class) final String driverClientCommand) {
+ this.clock = clock;
+ this.jvmProcessFactory = jvmProcessFactory;
+ this.clrProcessFactory = clrProcessFactory;
+ this.evaluatorRequestor = evaluatorRequestor;
+ this.driverClientCommand = driverClientCommand;
+ this.tcpPortProvider = tcpPortProvider;
+ }
+
+ private void start() throws IOException {
+ for (final Integer port : this.tcpPortProvider) {
+ try {
+ this.server = ServerBuilder.forPort(port)
+ .addService(new DriverBridgeServiceImpl())
+ .build()
+ .start();
+ LOG.info("Server started, listening on " + port);
+ break;
+ } catch (IOException e) {
+ LOG.log(Level.WARNING, "Unable to bind to port [{0}]", port);
+ }
+ }
+ if (this.server == null || this.server.isTerminated()) {
+ throw new IOException("Unable to start gRPC server");
+ } else {
+ final String cmd = this.driverClientCommand + " " + this.server.getPort();
+ final String cmdOs = OSUtils.isWindows() ? "cmd.exe /c \"" + cmd + "\"" : cmd;
+ final String cmdStd = cmdOs + " 1> driverclient.stdout 2> driverclient.stderr";
+ this.driverProcess = Runtime.getRuntime().exec(cmdStd);
+ }
+ }
+
+ private void stop() {
+ stop(null);
+ }
+
+ private void stop(final Throwable t) {
+ if (!stopped) {
+ try {
+ if (server != null) {
+ this.server.shutdown();
+ this.server = null;
+ }
+ if (this.driverProcess != null) {
+ this.driverProcess.destroy();
+ this.driverProcess = null;
+ }
+ if (t != null) {
+ clock.stop(t);
+ } else {
+ clock.stop();
+ }
+ } finally {
+ stopped = true;
+ }
+ }
+ }
+
+ /**
+ * Await termination on the main thread since the grpc library uses daemon threads.
+ */
+ private void blockUntilShutdown() throws InterruptedException {
+ if (server != null) {
+ server.awaitTermination();
+ }
+ }
+
+ /**
+ * Determines if the driver process is still alive by
+ * testing for its exit value, which throws {@link IllegalThreadStateException}
+ * if process is still running.
+ * @return true if driver process is alive, false otherwise
+ */
+ private boolean driverProcessIsAlive() {
+ if (this.driverProcess != null) {
+ try {
+ this.driverProcess.exitValue();
+ } catch (IllegalThreadStateException e) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private EvaluatorDescriptorInfo toEvaluatorDescriptorInfo(final EvaluatorDescriptor descriptor) {
+ if (descriptor == null) {
+ return null;
+ } else {
+ return EvaluatorDescriptorInfo.newBuilder()
+ .setCores(descriptor.getNumberOfCores())
+ .setMemory(descriptor.getMemory())
+ .setRuntimeName(descriptor.getRuntimeName())
+ .build();
+ }
+ }
+
+ @Override
+ public IdleMessage getIdleStatus() {
+ final IdleStatus idleStatus = this.clientStub.idlenessCheckHandler(null);
+ return new IdleMessage(
+ "Java Bridge DriverService",
+ idleStatus.getReason(),
+ idleStatus.getIsIdle());
+ }
+
+ @Override
+ public void startHandler(final StartTime startTime) {
+ try {
+ start();
+ synchronized (this) {
+ // wait for driver client process to register
+ while (this.clientStub == null && driverProcessIsAlive()) {
+ this.wait(1000); // a second
+ }
+ if (this.clientStub != null) {
+ this.clientStub.startHandler(
+ StartTimeInfo.newBuilder().setStartTime(startTime.getTimestamp()).build());
+ } else {
+ stop(new IllegalStateException("Unable to start driver client"));
+ }
+ }
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ stop();
+ }
+ }
+
+ @Override
+ public void stopHandler(final StopTime stopTime) {
+ synchronized (this) {
+ try {
+ if (clientStub != null) {
+ this.clientStub.stopHandler(
+ StopTimeInfo.newBuilder().setStopTime(stopTime.getTimestamp()).build());
+ }
+ } finally {
+ stop();
+ }
+ }
+
+ }
+
+ @Override
+ public void allocatedEvaluatorHandler(final AllocatedEvaluator eval) {
+ synchronized (this) {
+ this.allocatedEvaluatorMap.put(eval.getId(), eval);
+ this.clientStub.allocatedEvaluatorHandler(
+ EvaluatorInfo.newBuilder()
+ .setEvaluatorId(eval.getId())
+ .setDescriptorInfo(toEvaluatorDescriptorInfo(eval.getEvaluatorDescriptor()))
+ .build());
+ }
+ }
+
+ @Override
+ public void completedEvaluatorHandler(final CompletedEvaluator eval) {
+ synchronized (this) {
+ this.allocatedEvaluatorMap.remove(eval.getId());
+ this.clientStub.completedEvaluatorHandler(
+ EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build());
+ }
+ }
+
+ @Override
+ public void failedEvaluatorHandler(final FailedEvaluator eval) {
+ synchronized (this) {
+ this.allocatedEvaluatorMap.remove(eval.getId());
+ this.clientStub.failedEvaluatorHandler(
+ EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build());
+ }
+ }
+
+ @Override
+ public void activeContextHandler(final ActiveContext context) {
+ synchronized (this) {
+ this.activeContextMap.put(context.getId(), context);
+ this.clientStub.activeContextHandler(
+ ContextInfo.newBuilder()
+ .setContextId(context.getId())
+ .setEvaluatorId(context.getEvaluatorId())
+ .setParentId(
+ context.getParentId().isPresent() ?
+ context.getParentId().get() : null)
+ .build());
+ }
+ }
+
+ @Override
+ public void closedContextHandler(final ClosedContext context) {
+ synchronized (this) {
+ this.activeContextMap.remove(context.getId());
+ this.clientStub.closedContextHandler(
+ ContextInfo.newBuilder()
+ .setContextId(context.getId())
+ .setEvaluatorId(context.getEvaluatorId())
+ .setParentId(context.getParentContext().getId())
+ .build());
+ }
+ }
+
+ @Override
+ public void failedContextHandler(final FailedContext context) {
+ synchronized (this) {
+ this.activeContextMap.remove(context.getId());
+ this.clientStub.closedContextHandler(
+ ContextInfo.newBuilder()
+ .setContextId(context.getId())
+ .setEvaluatorId(context.getEvaluatorId())
+ .setParentId(
+ context.getParentContext().isPresent() ?
+ context.getParentContext().get().getId() : null)
+ .build());
+ }
+ }
+
+ @Override
+ public void contextMessageHandler(final ContextMessage message) {
+ synchronized (this) {
+ this.clientStub.contextMessageHandler(
+ ContextMessageInfo.newBuilder()
+ .setContextId(message.getId())
+ .setMessageSourceId(message.getMessageSourceID())
+ .setSequenceNumber(message.getSequenceNumber())
+ .setPayload(ByteString.copyFrom(message.get()))
+ .build());
+ }
+ }
+
+ @Override
+ public void runningTaskHandler(final RunningTask task) {
+ synchronized (this) {
+ this.runningTaskMap.put(task.getId(), task);
+ this.clientStub.runningTaskHandler(
+ TaskInfo.newBuilder()
+ .setTaskId(task.getId())
+ .setContextId(task.getActiveContext().getId())
+ .build());
+ }
+ }
+
+ @Override
+ public void failedTaskHandler(final FailedTask task) {
+ synchronized (this) {
+ this.runningTaskMap.remove(task.getId());
+ this.clientStub.failedTaskHandler(
+ TaskInfo.newBuilder()
+ .setTaskId(task.getId())
+ .setContextId(
+ task.getActiveContext().isPresent() ?
+ task.getActiveContext().get().getId() : null)
+ .build());
+ }
+ }
+
+ @Override
+ public void completedTaskHandler(final CompletedTask task) {
+ synchronized (this) {
+ this.runningTaskMap.remove(task.getId());
+ this.clientStub.completedTaskHandler(
+ TaskInfo.newBuilder()
+ .setTaskId(task.getId())
+ .setContextId(task.getActiveContext().getId())
+ .build());
+ }
+ }
+
+ @Override
+ public void suspendedTaskHandler(final SuspendedTask task) {
+ synchronized (this) {
+ this.runningTaskMap.remove(task.getId());
+ this.clientStub.suspendedTaskHandler(
+ TaskInfo.newBuilder()
+ .setTaskId(task.getId())
+ .setContextId(task.getActiveContext().getId())
+ .build());
+ }
+ }
+
+ @Override
+ public void taskMessageHandler(final TaskMessage message) {
+ synchronized (this) {
+ this.clientStub.taskMessageHandler(
+ TaskMessageInfo.newBuilder()
+ .setTaskId(message.getId())
+ .setContextId(message.getContextId())
+ .setMessageSourceId(message.getMessageSourceID())
+ .setSequenceNumber(message.getSequenceNumber())
+ .setPayload(ByteString.copyFrom(message.get()))
+ .build());
+ }
+ }
+
+ @Override
+ public void clientMessageHandler(final byte[] message) {
+ synchronized (this) {
+ this.clientStub.clientMessageHandler(
+ ClientMessageInfo.newBuilder()
+ .setPayload(ByteString.copyFrom(message))
+ .build());
+ }
+ }
+
+ @Override
+ public void clientCloseHandler() {
+ synchronized (this) {
+ this.clientStub.clientCloseHandler(
+ Void.newBuilder().build());
+ }
+ }
+
+ @Override
+ public void clientCloseWithMessageHandler(final byte[] message) {
+ synchronized (this) {
+ this.clientStub.clientCloseWithMessageHandler(
+ ClientMessageInfo.newBuilder()
+ .setPayload(ByteString.copyFrom(message))
+ .build());
+ }
+ }
+
+ private final class DriverBridgeServiceImpl
+ extends DriverServiceGrpc.DriverServiceImplBase {
+
+ @Override
+ public void registerDriverClient(
+ final DriverClientRegistration request,
+ final StreamObserver<Void> responseObserver) {
+ try {
+ final ManagedChannel channel = ManagedChannelBuilder
+ .forAddress(request.getHost(), request.getPort())
+ .usePlaintext(true)
+ .build();
+ synchronized (GRPCDriverService.this) {
+ GRPCDriverService.this.clientStub = DriverClientGrpc.newBlockingStub(channel);
+ GRPCDriverService.this.notifyAll();
+ }
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void requestResources(
+ final ResourceRequest request,
+ final StreamObserver<Void> responseObserver) {
+ try {
+ synchronized (GRPCDriverService.this) {
+ EvaluatorRequest.Builder requestBuilder = GRPCDriverService.this.evaluatorRequestor.newRequest();
+ requestBuilder.setNumber(request.getResourceCount());
+ requestBuilder.setNumberOfCores(request.getCores());
+ requestBuilder.setMemory(request.getMemorySize());
+ requestBuilder.setRelaxLocality(request.getRelaxLocality());
+ requestBuilder.setRuntimeName(request.getRuntimeName());
+ if (request.getNodeNameListCount() > 0) {
+ requestBuilder.addNodeNames(request.getNodeNameListList());
+ }
+ if (request.getRackNameListCount() > 0) {
+ for (final String rackName : request.getRackNameListList()) {
+ requestBuilder.addRackName(rackName);
+ }
+ }
+ GRPCDriverService.this.evaluatorRequestor.submit(requestBuilder.build());
+ }
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void shutdown(
+ final ShutdownRequest request,
+ final StreamObserver<Void> responseObserver) {
+ try {
+ synchronized (GRPCDriverService.this) {
+ if (request.getException() != null) {
+ GRPCDriverService.this.clock.stop(
+ new DriverClientException(request.getException().getMessage()));
+ } else {
+ GRPCDriverService.this.clock.stop();
+ }
+ }
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void setAlarm(
+ final AlarmRequest request,
+ final StreamObserver<Void> responseObserver) {
+ try {
+ synchronized (GRPCDriverService.this) {
+ GRPCDriverService.this.clock.scheduleAlarm(request.getTimeoutMs(), new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm value) {
+ synchronized (GRPCDriverService.this) {
+ GRPCDriverService.this.clientStub.alarmTrigger(
+ AlarmTriggerInfo.newBuilder().setAlarmId(request.getAlarmId()).build());
+ }
+ }
+ });
+ }
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void allocatedEvaluatorOp(
+ final AllocatedEvaluatorRequest request,
+ final StreamObserver<Void> responseObserver) {
+ try {
+ if (request.getEvaluatorConfiguration() == null) {
+ responseObserver.onError(
+ new IllegalArgumentException("Evaluator configuration required"));
+ } else if (request.getContextConfiguration() == null && request.getTaskConfiguration() == null) {
+ responseObserver.onError(
+ new IllegalArgumentException("Context and/or Task configuration required"));
+ } else {
+ synchronized (GRPCDriverService.this) {
+ if (!GRPCDriverService.this.allocatedEvaluatorMap.containsKey(request.getEvaluatorId())) {
+ responseObserver.onError(
+ new IllegalArgumentException("Unknown allocated evaluator " + request.getEvaluatorId()));
+ }
+ final AllocatedEvaluator evaluator =
+ GRPCDriverService.this.allocatedEvaluatorMap.get(request.getEvaluatorId());
+ if (request.getCloseEvaluator()) {
+ evaluator.close();
+ } else {
+ if (request.getAddFilesCount() > 0) {
+ for (final String file : request.getAddFilesList()) {
+ evaluator.addFile(new File(file));
+ }
+ }
+ if (request.getAddLibrariesCount() > 0) {
+ for (final String library : request.getAddLibrariesList()) {
+ evaluator.addLibrary(new File(library));
+ }
+ }
+ if (request.getSetProcess() != null) {
+ final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest =
+ request.getSetProcess();
+ switch (evaluator.getEvaluatorDescriptor().getProcess().getType()) {
+ case JVM:
+ setJVMProcess(evaluator, processRequest);
+ break;
+ case CLR:
+ setCLRProcess(evaluator, processRequest);
+ break;
+ default:
+ throw new RuntimeException("Unknown evaluator process type");
+ }
+ }
+ if (request.getContextConfiguration() != null && request.getTaskConfiguration() != null) {
+ // submit context and task
+ ((AllocatedEvaluatorImpl) evaluator).submitContextAndTask(
+ request.getEvaluatorConfiguration(),
+ request.getContextConfiguration(),
+ request.getTaskConfiguration());
+ } else if (request.getContextConfiguration() != null) {
+ // submit context
+ ((AllocatedEvaluatorImpl) evaluator).submitContext(
+ request.getEvaluatorConfiguration(),
+ request.getContextConfiguration());
+ } else if (request.getTaskConfiguration() != null) {
+ // submit task
+ ((AllocatedEvaluatorImpl) evaluator).submitTask(
+ request.getEvaluatorConfiguration(),
+ request.getTaskConfiguration());
+ } else {
+ throw new RuntimeException("Missing check for required evaluator configurations");
+ }
+ }
+ }
+ }
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void activeContextOp(
+ final ActiveContextRequest request,
+ final StreamObserver<Void> responseObserver) {
+ synchronized (GRPCDriverService.this) {
+ if (!GRPCDriverService.this.activeContextMap.containsKey(request.getContextId())) {
+ responseObserver.onError(
+ new IllegalArgumentException("Context does not exist with id " + request.getContextId()));
+ } else if (request.getNewContextRequest() != null && request.getNewTaskRequest() != null) {
+ responseObserver.onError(
+ new IllegalArgumentException("Context request can only contain one of a context or task configuration"));
+
+ }
+ final ActiveContext context = GRPCDriverService.this.activeContextMap.get(request.getContextId());
+ if (request.getOperationCase() == ActiveContextRequest.OperationCase.CLOSE_CONTEXT) {
+ if (request.getCloseContext()) {
+ try {
+ context.close();
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ } else {
+ responseObserver.onError(new IllegalArgumentException("Close context operation not set to true"));
+ }
+ } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.MESSAGE) {
+ if (request.getMessage() != null) {
+ try {
+ context.sendMessage(request.getMessage().toByteArray());
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ } else {
+ responseObserver.onError(new IllegalArgumentException("Empty message on operation send message"));
+ }
+ } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.NEW_CONTEXT_REQUEST) {
+ try {
+ ((EvaluatorContext) context).submitContext(request.getNewContextRequest());
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.NEW_TASK_REQUEST) {
+ try {
+ ((EvaluatorContext) context).submitTask(request.getNewTaskRequest());
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void runningTaskOp(
+ final RunningTaskRequest request,
+ final StreamObserver<Void> responseObserver) {
+ synchronized (GRPCDriverService.this) {
+ if (!GRPCDriverService.this.runningTaskMap.containsKey(request.getTaskId())) {
+ responseObserver.onError(
+ new IllegalArgumentException("Task does not exist with id " + request.getTaskId()));
+ }
+ try {
+ final RunningTask task = GRPCDriverService.this.runningTaskMap.get(request.getTaskId());
+ if (request.getCloseTask()) {
+ if (request.getMessage() != null) {
+ task.close(request.getMessage().toByteArray());
+ } else {
+ task.close();
+ }
+ } else if (request.getMessage() != null) {
+ task.send(request.getMessage().toByteArray());
+ }
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+ }
+ }
+
+ private void setCLRProcess(
+ final AllocatedEvaluator evaluator,
+ final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest) {
+ final CLRProcess process = GRPCDriverService.this.clrProcessFactory.newEvaluatorProcess();
+ if (processRequest.getMemoryMb() > 0) {
+ process.setMemory(processRequest.getMemoryMb());
+ }
+ if (processRequest.getConfigurationFileName() != null) {
+ process.setConfigurationFileName(processRequest.getConfigurationFileName());
+ }
+ if (processRequest.getStandardOut() != null) {
+ process.setStandardOut(processRequest.getStandardOut());
+ }
+ if (processRequest.getStandardErr() != null) {
+ process.setStandardErr(processRequest.getStandardErr());
+ }
+ evaluator.setProcess(process);
+ }
+
+ private void setJVMProcess(
+ final AllocatedEvaluator evaluator,
+ final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest) {
+ final JVMProcess process = GRPCDriverService.this.jvmProcessFactory.newEvaluatorProcess();
+ if (processRequest.getMemoryMb() > 0) {
+ process.setMemory(processRequest.getMemoryMb());
+ }
+ if (processRequest.getConfigurationFileName() != null) {
+ process.setConfigurationFileName(processRequest.getConfigurationFileName());
+ }
+ if (processRequest.getStandardOut() != null) {
+ process.setStandardOut(processRequest.getStandardOut());
+ }
+ if (processRequest.getStandardErr() != null) {
+ process.setStandardErr(processRequest.getStandardErr());
+ }
+ if (processRequest.getOptionsCount() > 0) {
+ for (final String option : processRequest.getOptionsList()) {
+ process.addOption(option);
+ }
+ }
+ evaluator.setProcess(process);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/package-info.java
new file mode 100644
index 0000000..a94328d
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * GRPC implementation for driver bridge service.
+ */
+package org.apache.reef.bridge.service.grpc;
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/package-info.java
new file mode 100644
index 0000000..25a8918
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The Java-side of the CLR/Java bridge interop via gRPC/Protocol Buffers.
+ */
+package org.apache.reef.bridge.service;
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/DriverClientCommand.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/DriverClientCommand.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/DriverClientCommand.java
new file mode 100644
index 0000000..255f60d
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/DriverClientCommand.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.service.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * What command to use when starting bridge process.
+ */
+@NamedParameter(doc = "The command to launch bridge driver process",
+ short_name = "command")
+public final class DriverClientCommand implements Name<String> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/package-info.java
new file mode 100644
index 0000000..6a3b956
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Driver bridge service parameters.
+ */
+package org.apache.reef.bridge.service.parameters;
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java
index f52cc7f..d223717 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java
@@ -29,7 +29,7 @@ import org.apache.reef.driver.evaluator.EvaluatorProcess;
*/
@Private
@DriverSide
-final class EvaluatorDescriptorImpl implements EvaluatorDescriptor {
+public final class EvaluatorDescriptorImpl implements EvaluatorDescriptor {
private final NodeDescriptor nodeDescriptor;
private final int megaBytes;
@@ -37,7 +37,7 @@ final class EvaluatorDescriptorImpl implements EvaluatorDescriptor {
private EvaluatorProcess process;
private final String runtimeName;
- EvaluatorDescriptorImpl(final NodeDescriptor nodeDescriptor,
+ public EvaluatorDescriptorImpl(final NodeDescriptor nodeDescriptor,
final int megaBytes,
final int numberOfCores,
final EvaluatorProcess process,
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
index 5f81a5d..bd7784b 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
@@ -108,7 +108,7 @@ public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder {
if (classPath != null && !classPath.isEmpty()) {
add("-classpath");
- add(classPath);
+ add("\"" + classPath + "\"");
}
propagateProperties(this, true, "proc_reef");
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java
index b6a7aa0..e4e7fb6 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java
@@ -24,7 +24,8 @@ import org.apache.reef.tang.annotations.NamedParameter;
/**
* First tcp port number to try.
*/
-@NamedParameter(doc = "First tcp port number to try", default_value = TcpPortRangeBegin.DEFAULT_VALUE)
+@NamedParameter(doc = "First tcp port number to try",
+ short_name = "tcp_port_range_begin", default_value = TcpPortRangeBegin.DEFAULT_VALUE)
public final class TcpPortRangeBegin implements Name<Integer> {
public static final String DEFAULT_VALUE = "10000";
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java
index ee5879d..23b65fb 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java
@@ -24,7 +24,8 @@ import org.apache.reef.tang.annotations.NamedParameter;
/**
* Number of tcp ports in the range.
*/
-@NamedParameter(doc = "Number of tcp ports in the range", default_value = TcpPortRangeCount.DEFAULT_VALUE)
+@NamedParameter(doc = "Number of tcp ports in the range",
+ short_name = "tcp_port_range_count", default_value = TcpPortRangeCount.DEFAULT_VALUE)
public final class TcpPortRangeCount implements Name<Integer> {
public static final String DEFAULT_VALUE = "10000";
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java
index 60cedea..20be605 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java
@@ -24,7 +24,8 @@ import org.apache.reef.tang.annotations.NamedParameter;
/**
* Max number tries for port numbers.
*/
-@NamedParameter(doc = "Max number tries for port numbers", default_value = TcpPortRangeTryCount.DEFAULT_VALUE)
+@NamedParameter(doc = "Max number tries for port numbers",
+ short_name = "tcp_port_range_try_count", default_value = TcpPortRangeTryCount.DEFAULT_VALUE)
public final class TcpPortRangeTryCount implements Name<Integer> {
public static final String DEFAULT_VALUE = "1000";
http://git-wip-us.apache.org/repos/asf/reef/blob/bb2a7345/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8d58a93..bd35fbd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -289,6 +289,8 @@ under the License.
<exclude>REEF_STANDALONE_RUNTIME/**</exclude>
<!-- Error logs -->
<exclude>**/*.log</exclude>
+ <!-- Shading configuration -->
+ <exclude>**/dependency-reduced-pom.xml</exclude>
<!-- The Visual Studio and Nuget build files -->
<exclude>**/.vs/**</exclude>
<exclude>**/*.sln*</exclude>
@@ -298,6 +300,7 @@ under the License.
<exclude>**/*.sdf*</exclude>
<exclude>**/*.snk</exclude>
<exclude>**/*.opendb</exclude>
+ <exclude>**/*.resx</exclude>
<!-- The below are auto generated during the .Net build -->
<exclude>**/bin/**</exclude>
<exclude>**/obj/**</exclude>
@@ -769,6 +772,7 @@ under the License.
<module>lang/java/reef-applications</module>
<module>lang/java/reef-bridge-client</module>
<module>lang/java/reef-bridge-java</module>
+ <module>lang/java/reef-bridge-proto-java</module>
<module>lang/java/reef-checkpoint</module>
<module>lang/java/reef-common</module>
<module>lang/java/reef-examples</module>