You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by tc...@apache.org on 2018/05/23 20:45:29 UTC
[1/2] reef git commit: [Trivial] Add Sergiy test cases for the bridge.
Repository: reef
Updated Branches:
refs/heads/REEF-335 3b1651917 -> 8f3dafa9f
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-tests/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/pom.xml b/lang/java/reef-tests/pom.xml
index b096e39..d39734b 100644
--- a/lang/java/reef-tests/pom.xml
+++ b/lang/java/reef-tests/pom.xml
@@ -34,6 +34,7 @@ under the License.
<properties>
<rootPath>${basedir}/../../..</rootPath>
+ <protobuf.version>3.5.1</protobuf.version>
</properties>
<dependencies>
@@ -79,6 +80,16 @@ under the License.
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>reef-bridge-proto-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>vortex</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailBridgeClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailBridgeClient.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailBridgeClient.java
new file mode 100644
index 0000000..84653bd
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailBridgeClient.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.tests.fail.driver;
+
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.bridge.driver.client.DriverClientConfiguration;
+import org.apache.reef.bridge.proto.ClientProtocol;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tests.TestDriverLauncher;
+import org.apache.reef.tests.fail.util.FailBridgeClientUtils;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.util.OSUtils;
+
+import java.io.IOException;
+
+/**
+ * fail bridge client.
+ */
+@Private
+@ClientSide
+public final class FailBridgeClient {
+
+ private static final Tang TANG = Tang.Factory.getTang();
+
+ private static Configuration buildDriverConfig(final Class<?> failMsgClass) {
+
+ final Configuration driverConfig = DriverClientConfiguration.CONF
+ .set(DriverClientConfiguration.ON_DRIVER_STARTED, FailDriver.StartHandler.class)
+ .set(DriverClientConfiguration.ON_DRIVER_STOP, FailDriver.StopHandler.class)
+ .set(DriverClientConfiguration.ON_EVALUATOR_ALLOCATED, FailDriver.AllocatedEvaluatorHandler.class)
+ .set(DriverClientConfiguration.ON_EVALUATOR_COMPLETED, FailDriver.CompletedEvaluatorHandler.class)
+ .set(DriverClientConfiguration.ON_EVALUATOR_FAILED, FailDriver.FailedEvaluatorHandler.class)
+ .set(DriverClientConfiguration.ON_CONTEXT_ACTIVE, FailDriver.ActiveContextHandler.class)
+ .set(DriverClientConfiguration.ON_CONTEXT_MESSAGE, FailDriver.ContextMessageHandler.class)
+ .set(DriverClientConfiguration.ON_CONTEXT_CLOSED, FailDriver.ClosedContextHandler.class)
+ .set(DriverClientConfiguration.ON_CONTEXT_FAILED, FailDriver.FailedContextHandler.class)
+ .set(DriverClientConfiguration.ON_TASK_RUNNING, FailDriver.RunningTaskHandler.class)
+ .set(DriverClientConfiguration.ON_TASK_SUSPENDED, FailDriver.SuspendedTaskHandler.class)
+ .set(DriverClientConfiguration.ON_TASK_MESSAGE, FailDriver.TaskMessageHandler.class)
+ .set(DriverClientConfiguration.ON_TASK_FAILED, FailDriver.FailedTaskHandler.class)
+ .set(DriverClientConfiguration.ON_TASK_COMPLETED, FailDriver.CompletedTaskHandler.class)
+ .build();
+
+ return TANG.newConfigurationBuilder(driverConfig)
+ .bindNamedParameter(FailDriver.FailMsgClassName.class, failMsgClass.getName())
+ .build();
+ }
+
+ public static LauncherStatus runClient(
+ final Class<?> failMsgClass,
+ final Configuration runtimeConfig,
+ final int timeOut) throws IOException, InjectionException {
+ ClientProtocol.DriverClientConfiguration.Builder builder =
+ ClientProtocol.DriverClientConfiguration.newBuilder()
+ .setJobid("Fail_" + failMsgClass.getSimpleName())
+ .addGlobalLibraries(EnvironmentUtils.getClassLocation(FailDriver.class));
+ builder.setOperatingSystem(
+ OSUtils.isWindows() ?
+ ClientProtocol.DriverClientConfiguration.OS.WINDOWS :
+ ClientProtocol.DriverClientConfiguration.OS.LINUX);
+
+ return runClient(failMsgClass, runtimeConfig, builder.build(), timeOut);
+ }
+
+ public static LauncherStatus runClient(
+ final Class<?> failMsgClass,
+ final Configuration runtimeConfig,
+ final ClientProtocol.DriverClientConfiguration driverClientConfiguration,
+ final int timeOut) throws InjectionException, IOException {
+ final Configuration driverServiceConfiguration =
+ FailBridgeClientUtils.setupDriverService(
+ runtimeConfig,
+ buildDriverConfig(failMsgClass),
+ driverClientConfiguration);
+ return TestDriverLauncher.getLauncher(runtimeConfig).run(driverServiceConfiguration, timeOut);
+ }
+
+
+ private FailBridgeClient() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java
index 8b4c8de..1bd19f1 100644
--- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java
@@ -281,6 +281,7 @@ public final class FailDriver {
throw new DriverSideFailure("Unexpected state: " + state);
}
// After a delay, send message or suspend the task:
+ LOG.log(Level.INFO, "Schedule alarm on state {0}", state.name());
clock.scheduleAlarm(MSG_DELAY, new AlarmHandler());
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/BridgeClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/BridgeClient.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/BridgeClient.java
new file mode 100644
index 0000000..5317a7b
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/BridgeClient.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.bridge.driver.client.DriverClientConfiguration;
+import org.apache.reef.bridge.proto.ClientProtocol;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.task.Task;
+import org.apache.reef.tests.TestDriverLauncher;
+import org.apache.reef.tests.fail.util.FailBridgeClientUtils;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.util.OSUtils;
+
+import java.io.IOException;
+
+/**
+ * Fail task bridge client.
+ */
+@Private
+@ClientSide
+public final class BridgeClient {
+
+ /**
+ * Empty private constructor to prohibit instantiation of utility class.
+ */
+ private BridgeClient() {
+ }
+
+ public static LauncherStatus run(
+ final Class<? extends Task> failTaskClass,
+ final Configuration runtimeConfig,
+ final int timeOut) throws IOException, InjectionException {
+ ClientProtocol.DriverClientConfiguration.Builder builder =
+ ClientProtocol.DriverClientConfiguration.newBuilder()
+ .setJobid("Fail_" + failTaskClass.getSimpleName())
+ .addGlobalLibraries(EnvironmentUtils.getClassLocation(Driver.class));
+ builder.setOperatingSystem(
+ OSUtils.isWindows() ?
+ ClientProtocol.DriverClientConfiguration.OS.WINDOWS :
+ ClientProtocol.DriverClientConfiguration.OS.LINUX);
+
+ return run(failTaskClass, runtimeConfig, builder.build(), timeOut);
+ }
+
+ public static LauncherStatus run(
+ final Class<? extends Task> failTaskClass,
+ final Configuration runtimeConfig,
+ final ClientProtocol.DriverClientConfiguration driverClientConfiguration,
+ final int timeOut) throws InjectionException, IOException {
+
+ final Configuration driverConfig = DriverClientConfiguration.CONF
+ .set(DriverClientConfiguration.ON_EVALUATOR_ALLOCATED, Driver.AllocatedEvaluatorHandler.class)
+ .set(DriverClientConfiguration.ON_TASK_RUNNING, Driver.RunningTaskHandler.class)
+ .set(DriverClientConfiguration.ON_CONTEXT_ACTIVE, Driver.ActiveContextHandler.class)
+ .set(DriverClientConfiguration.ON_DRIVER_STARTED, Driver.StartHandler.class)
+ .build();
+
+ final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+ cb.addConfiguration(driverConfig);
+ cb.bindNamedParameter(Driver.FailTaskName.class, failTaskClass.getSimpleName());
+
+ final Configuration driverServiceConfiguration =
+ FailBridgeClientUtils.setupDriverService(
+ runtimeConfig,
+ cb.build(),
+ driverClientConfiguration);
+ return TestDriverLauncher.getLauncher(runtimeConfig).run(driverServiceConfiguration, timeOut);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/util/FailBridgeClientUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/util/FailBridgeClientUtils.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/util/FailBridgeClientUtils.java
new file mode 100644
index 0000000..b8c7811
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/util/FailBridgeClientUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.tests.fail.util;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.bridge.driver.client.JavaDriverClientLauncher;
+import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider;
+import org.apache.reef.bridge.driver.service.grpc.GRPCDriverServiceConfigurationProvider;
+import org.apache.reef.bridge.proto.ClientProtocol;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Fail bridge client utilities.
+ */
+@Private
+@ClientSide
+public final class FailBridgeClientUtils {
+
+ private static final Tang TANG = Tang.Factory.getTang();
+
+ /**
+ * Setup the bridge service configuration.
+ * @param runtimeConfiguration runtime configuration
+ * @param driverClientConfiguration driver client configuration
+ * @param driverClientConfigurationProto protocol arguments
+ * @return bridge service configuration
+ * @throws IOException
+ * @throws InjectionException
+ */
+ public static Configuration setupDriverService(
+ final Configuration runtimeConfiguration,
+ final Configuration driverClientConfiguration,
+ final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto)
+ throws IOException, InjectionException {
+ ClientProtocol.DriverClientConfiguration.Builder builder =
+ ClientProtocol.DriverClientConfiguration.newBuilder(driverClientConfigurationProto);
+ final File driverClientConfigurationFile = File.createTempFile("driverclient", ".conf");
+ // Write driver client configuration to a file
+ final Injector driverClientInjector = Tang.Factory.getTang().newInjector(driverClientConfiguration);
+ final ConfigurationSerializer configurationSerializer =
+ driverClientInjector.getInstance(ConfigurationSerializer.class);
+ configurationSerializer.toFile(driverClientConfiguration, driverClientConfigurationFile);
+
+ final Injector runtimeInjector = TANG.newInjector(runtimeConfiguration);
+ final REEFFileNames fileNames = runtimeInjector.getInstance(REEFFileNames.class);
+ final ClasspathProvider classpathProvider = runtimeInjector.getInstance(ClasspathProvider.class);
+ final List<String> launchCommand = new JavaLaunchCommandBuilder(JavaDriverClientLauncher.class, null)
+ .setConfigurationFilePaths(
+ Collections.singletonList("./" + fileNames.getLocalFolderPath() + "/" +
+ driverClientConfigurationFile.getName()))
+ .setJavaPath("java")
+ .setClassPath(driverClientConfigurationProto.getOperatingSystem() ==
+ ClientProtocol.DriverClientConfiguration.OS.WINDOWS ?
+ "\"" + StringUtils.join(classpathProvider.getDriverClasspath(), ";") + "\"" :
+ StringUtils.join(classpathProvider.getDriverClasspath(), ":"))
+ .build();
+ final String cmd = StringUtils.join(launchCommand, ' ');
+ builder.setDriverClientLaunchCommand(cmd);
+ builder.addLocalFiles(driverClientConfigurationFile.getAbsolutePath());
+
+ final IDriverServiceConfigurationProvider driverServiceConfigurationProvider = TANG.newInjector(
+ TANG.newConfigurationBuilder()
+ .bindImplementation(IDriverServiceConfigurationProvider.class,
+ GRPCDriverServiceConfigurationProvider.class)
+ .build())
+ .getInstance(IDriverServiceConfigurationProvider.class);
+ return driverServiceConfigurationProvider.getDriverServiceConfiguration(builder.build());
+ }
+
+ private FailBridgeClientUtils() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/util/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/util/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/util/package-info.java
new file mode 100644
index 0000000..d271af2
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/util/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Utilities for Driver-side failures.
+ */
+package org.apache.reef.tests.fail.util;
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailBridgeDriverTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailBridgeDriverTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailBridgeDriverTest.java
new file mode 100644
index 0000000..cf81007
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailBridgeDriverTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.tests.fail;
+
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.driver.task.SuspendedTask;
+import org.apache.reef.driver.task.TaskMessage;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tests.TestEnvironment;
+import org.apache.reef.tests.TestEnvironmentFactory;
+import org.apache.reef.tests.TestUtils;
+import org.apache.reef.tests.fail.driver.FailBridgeClient;
+import org.apache.reef.tests.fail.driver.FailClient;
+import org.apache.reef.tests.fail.driver.FailDriver;
+import org.apache.reef.tests.library.exceptions.SimulatedDriverFailure;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Run FailDriver with different types of failures.
+ */
+public class FailBridgeDriverTest {
+
+ private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment();
+
+ @Before
+ public void setUp() throws Exception {
+ testEnvironment.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ this.testEnvironment.tearDown();
+ }
+
+ private void failOn(final Class<?> clazz) throws BindException, InjectionException {
+ try {
+ TestUtils.assertLauncherFailure(
+ FailBridgeClient.runClient(clazz,
+ this.testEnvironment.getRuntimeConfiguration(), this.testEnvironment.getTestTimeout()),
+ SimulatedDriverFailure.class);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testFailDriverConstructor() throws BindException, InjectionException {
+ failOn(FailDriver.class);
+ }
+
+ @Test
+ public void testFailDriverStart() throws BindException, InjectionException {
+ failOn(StartTime.class);
+ }
+
+ @Test
+ public void testFailDriverAllocatedEvaluator() throws BindException, InjectionException {
+ failOn(AllocatedEvaluator.class);
+ }
+
+ @Test
+ public void testFailDriverActiveContext() throws BindException, InjectionException {
+ failOn(ActiveContext.class);
+ }
+
+ @Test
+ public void testFailDriverRunningTask() throws BindException, InjectionException {
+ failOn(RunningTask.class);
+ }
+
+ @Test
+ public void testFailDriverTaskMessage() throws BindException, InjectionException {
+ failOn(TaskMessage.class);
+ }
+
+ @Test
+ public void testFailDriverSuspendedTask() throws BindException, InjectionException {
+ failOn(SuspendedTask.class);
+ }
+
+ @Test
+ public void testFailDriverCompletedTask() throws BindException, InjectionException {
+ failOn(CompletedTask.class);
+ }
+
+ @Test
+ public void testFailDriverCompletedEvaluator() throws BindException, InjectionException {
+ failOn(CompletedEvaluator.class);
+ }
+
+ @Test
+ public void testFailDriverAlarm() throws BindException, InjectionException {
+ failOn(Alarm.class);
+ }
+
+ @Test
+ public void testFailDriverStop() throws BindException, InjectionException {
+ failOn(StopTime.class);
+ }
+
+ @Test
+ public void testDriverCompleted() throws BindException, InjectionException {
+ final Configuration runtimeConfiguration = this.testEnvironment.getRuntimeConfiguration();
+ // FailDriverTest can be replaced with any other class never used in FailDriver
+ final LauncherStatus status = FailClient.runClient(
+ FailDriverTest.class, runtimeConfiguration, this.testEnvironment.getTestTimeout());
+ Assert.assertEquals(LauncherStatus.COMPLETED, status);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailBridgeTaskTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailBridgeTaskTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailBridgeTaskTest.java
new file mode 100644
index 0000000..001b2a6
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailBridgeTaskTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail;
+
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.task.Task;
+import org.apache.reef.tests.TestEnvironment;
+import org.apache.reef.tests.TestEnvironmentFactory;
+import org.apache.reef.tests.TestUtils;
+import org.apache.reef.tests.fail.task.*;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Run Driver with different types of failures in the Task.
+ */
+public final class FailBridgeTaskTest {
+
+ private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment();
+
+ @Before
+ public void setUp() throws Exception {
+ testEnvironment.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ this.testEnvironment.tearDown();
+ }
+
+ private void failOn(
+ final Class<? extends Task> failTaskClass) throws BindException, InjectionException, IOException {
+ TestUtils.assertLauncherFailure(
+ BridgeClient.run(failTaskClass,
+ this.testEnvironment.getRuntimeConfiguration(),
+ this.testEnvironment.getTestTimeout()),
+ SimulatedTaskFailure.class);
+ }
+
+ @Test
+ public void testFailTask() throws BindException, InjectionException, IOException {
+ failOn(FailTask.class);
+ }
+
+ @Test
+ public void testFailTaskCall() throws BindException, InjectionException, IOException {
+ failOn(FailTaskCall.class);
+ }
+
+ @Test
+ public void testFailTaskMsg() throws BindException, InjectionException, IOException {
+ failOn(FailTaskMsg.class);
+ }
+
+ @Test
+ public void testFailTaskSuspend() throws BindException, InjectionException, IOException {
+ failOn(FailTaskSuspend.class);
+ }
+
+ @Test
+ public void testFailTaskStart() throws BindException, InjectionException, IOException {
+ failOn(FailTaskStart.class);
+ }
+
+ @Test
+ public void testFailTaskStop() throws BindException, InjectionException, IOException {
+ failOn(FailTaskStop.class);
+ }
+
+ @Test
+ public void testFailTaskClose() throws BindException, InjectionException, IOException {
+ failOn(FailTaskClose.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailTestSuite.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailTestSuite.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailTestSuite.java
index 9967d9b..a3481e4 100644
--- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailTestSuite.java
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailTestSuite.java
@@ -29,7 +29,9 @@ import org.junit.runners.Suite;
// FailTaskTest.class,
FailDriverTest.class,
FailDriverDelayedMsgTest.class,
- DriverFailOnFailTest.class
+ DriverFailOnFailTest.class,
+ FailBridgeDriverTest.class,
+ FailBridgeTaskTest.class
})
public final class FailTestSuite {
}
[2/2] reef git commit: [Trivial] Add Sergiy test cases for the bridge.
Posted by tc...@apache.org.
[Trivial] Add Sergiy test cases for the bridge.
Failure tests added for bridge
Pull Request:
Closes #1465
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/8f3dafa9
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/8f3dafa9
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/8f3dafa9
Branch: refs/heads/REEF-335
Commit: 8f3dafa9fd39f5c172a0957a9935ab2d91c0d542
Parents: 3b16519
Author: Tyson Condie <tc...@apache.org>
Authored: Mon May 21 14:29:02 2018 -0700
Committer: Tyson Condie <tc...@apache.org>
Committed: Wed May 23 13:24:08 2018 -0700
----------------------------------------------------------------------
.../proto/bridge/DriverClientProtocol.proto | 2 +-
.../proto/bridge/DriverCommonProtocol.proto | 11 +-
.../proto/bridge/DriverServiceProtocol.proto | 14 +-
.../bridge/client/DriverServiceLauncher.java | 86 +++--
.../bridge/driver/client/DriverClientClock.java | 3 +
.../driver/client/DriverClientDispatcher.java | 33 +-
.../client/DriverClientEvaluatorRequestor.java | 8 +-
.../client/DriverClientExceptionHandler.java | 8 +-
.../driver/client/IDriverClientService.java | 7 +
.../driver/client/IDriverServiceClient.java | 9 +
.../driver/client/JavaDriverClientLauncher.java | 22 +-
.../client/events/FailedContextBridge.java | 10 +-
.../driver/client/events/RunningTaskBridge.java | 4 +-
.../client/events/SuspendedTaskBridge.java | 56 +++
.../driver/client/grpc/DriverClientService.java | 107 ++++--
.../driver/client/grpc/DriverServiceClient.java | 65 +++-
.../bridge/driver/launch/IDriverLauncher.java | 3 +-
.../launch/azbatch/AzureBatchLauncher.java | 28 +-
.../driver/launch/local/LocalLauncher.java | 5 +-
.../bridge/driver/launch/yarn/YarnLauncher.java | 3 +-
.../service/DriverServiceConfiguration.java | 1 +
.../driver/service/grpc/GRPCDriverService.java | 353 +++++++++++++------
lang/java/reef-tests/pom.xml | 11 +
.../tests/fail/driver/FailBridgeClient.java | 103 ++++++
.../reef/tests/fail/driver/FailDriver.java | 1 +
.../reef/tests/fail/task/BridgeClient.java | 92 +++++
.../tests/fail/util/FailBridgeClientUtils.java | 103 ++++++
.../reef/tests/fail/util/package-info.java | 22 ++
.../reef/tests/fail/FailBridgeDriverTest.java | 141 ++++++++
.../reef/tests/fail/FailBridgeTaskTest.java | 95 +++++
.../apache/reef/tests/fail/FailTestSuite.java | 4 +-
31 files changed, 1166 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/common/proto/bridge/DriverClientProtocol.proto
----------------------------------------------------------------------
diff --git a/lang/common/proto/bridge/DriverClientProtocol.proto b/lang/common/proto/bridge/DriverClientProtocol.proto
index 6d0d08b..86d8836 100644
--- a/lang/common/proto/bridge/DriverClientProtocol.proto
+++ b/lang/common/proto/bridge/DriverClientProtocol.proto
@@ -37,7 +37,7 @@ service DriverClient {
// Request for resources
rpc StartHandler (StartTimeInfo) returns (Void) {}
- rpc StopHandler (StopTimeInfo) returns (Void) {}
+ rpc StopHandler (StopTimeInfo) returns (ExceptionInfo) {}
rpc AlarmTrigger (AlarmTriggerInfo) returns (Void) {}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/common/proto/bridge/DriverCommonProtocol.proto
----------------------------------------------------------------------
diff --git a/lang/common/proto/bridge/DriverCommonProtocol.proto b/lang/common/proto/bridge/DriverCommonProtocol.proto
index 7ec8905..e67ca73 100644
--- a/lang/common/proto/bridge/DriverCommonProtocol.proto
+++ b/lang/common/proto/bridge/DriverCommonProtocol.proto
@@ -30,15 +30,18 @@ package driverbridge;
message Void {}
message ExceptionInfo {
+ // no error present if true
+ bool no_error = 1;
+
// Exception name/type
- string name = 1;
+ string name = 2;
// Exception message
- string message = 2;
+ string message = 3;
// Stack trace
- repeated string stack_trace = 3;
+ repeated string stack_trace = 4;
// Data associated with exception
- bytes data = 4;
+ bytes data = 5;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/common/proto/bridge/DriverServiceProtocol.proto
----------------------------------------------------------------------
diff --git a/lang/common/proto/bridge/DriverServiceProtocol.proto b/lang/common/proto/bridge/DriverServiceProtocol.proto
index 7f6da24..0bc72ef 100644
--- a/lang/common/proto/bridge/DriverServiceProtocol.proto
+++ b/lang/common/proto/bridge/DriverServiceProtocol.proto
@@ -59,6 +59,9 @@ message DriverClientRegistration {
// The client's server port
int32 port = 2;
+
+ // Error during initialization
+ ExceptionInfo exception = 5;
}
// The request message containing resource request.
@@ -147,9 +150,12 @@ message ActiveContextRequest {
message RunningTaskRequest {
string task_id = 1;
- // close the task
- bool close_task = 2;
+ bytes message = 2;
- // send task a message
- bytes message = 3;
+ enum Operation {
+ CLOSE = 0;
+ SUSPEND = 1;
+ SEND_MESSAGE = 2;
+ }
+ Operation operation = 5;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java
index 5a7342c..23a73f2 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java
@@ -18,6 +18,7 @@
*/
package org.apache.reef.bridge.client;
+import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import org.apache.commons.lang.StringUtils;
import org.apache.reef.bridge.driver.launch.IDriverLauncher;
@@ -28,6 +29,7 @@ import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider
import org.apache.reef.bridge.driver.service.grpc.GRPCDriverServiceConfigurationProvider;
import org.apache.reef.bridge.driver.client.JavaDriverClientLauncher;
import org.apache.reef.bridge.proto.ClientProtocol;
+import org.apache.reef.client.LauncherStatus;
import org.apache.reef.runtime.azbatch.AzureBatchClasspathProvider;
import org.apache.reef.runtime.common.files.*;
import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
@@ -43,7 +45,6 @@ import org.apache.reef.tang.formats.ConfigurationSerializer;
import java.io.File;
import java.io.IOException;
-import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
@@ -70,7 +71,7 @@ public final class DriverServiceLauncher {
throw new RuntimeException("Do not instantiate this class!");
}
- public static void submit(
+ public static LauncherStatus submit(
final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto,
final Configuration driverClientConfiguration)
throws InjectionException, IOException {
@@ -118,22 +119,13 @@ public final class DriverServiceLauncher {
.setClassPath(driverClientConfigurationProto.getOperatingSystem() ==
ClientProtocol.DriverClientConfiguration.OS.WINDOWS ?
StringUtils.join(classpathProvider.getDriverClasspath(), ";") :
- StringUtils.join(classpathProvider.getDriverClasspath(), ":"))
+ StringUtils.join(classpathProvider.getDriverClasspath(), ":"))
.build();
final String cmd = StringUtils.join(launchCommand, ' ');
builder.setDriverClientLaunchCommand(cmd);
builder.addLocalFiles(driverClientConfigurationFile.getAbsolutePath());
- // call main()
- final File driverClientConfFile = File.createTempFile("driverclient", ".json");
- try {
- try (PrintWriter out = new PrintWriter(driverClientConfFile)) {
- out.println(JsonFormat.printer().print(builder.build()));
- }
- main(new String[]{driverClientConfFile.getAbsolutePath()});
- } finally {
- driverClientConfFile.deleteOnExit();
- }
+ return launch(driverClientConfigurationProto);
} finally {
driverClientConfigurationFile.deleteOnExit();
}
@@ -160,7 +152,7 @@ public final class DriverServiceLauncher {
.newInjector(yarnJobSubmissionClientConfig).getInstance(YarnLauncher.class);
}
- private static IDriverLauncher getAzureBatchDriverServiceLauncher() throws InjectionException {
+ private static IDriverLauncher getAzureBatchDriverServiceLauncher() throws InjectionException {
final Configuration azbatchJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder()
.bindImplementation(IDriverLauncher.class, AzureBatchLauncher.class)
.bindImplementation(IDriverServiceConfigurationProvider.class,
@@ -169,48 +161,52 @@ public final class DriverServiceLauncher {
return Tang.Factory.getTang().newInjector(azbatchJobSubmissionClientConfig).getInstance(AzureBatchLauncher.class);
}
- /**
- * Main method that launches the REEF job.
- *
- * @param args command line parameters.
- */
- public static void main(final String[] args) {
+ private static LauncherStatus launch(
+ final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) {
try {
- if (args.length != 1) {
- LOG.log(Level.SEVERE, DriverServiceLauncher.class.getName() +
- " accepts single argument referencing a file that contains a client protocol buffer driver configuration");
- }
- final String content;
- try {
- content = new String(Files.readAllBytes(Paths.get(args[0])));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- final ClientProtocol.DriverClientConfiguration.Builder driverClientConfigurationProtoBuilder =
- ClientProtocol.DriverClientConfiguration.newBuilder();
- JsonFormat.parser()
- .usingTypeRegistry(JsonFormat.TypeRegistry.getEmptyTypeRegistry())
- .merge(content, driverClientConfigurationProtoBuilder);
- final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto =
- driverClientConfigurationProtoBuilder.build();
switch (driverClientConfigurationProto.getRuntimeCase()) {
case YARN_RUNTIME:
final IDriverLauncher yarnDriverServiceLauncher = getYarnDriverServiceLauncher();
- yarnDriverServiceLauncher.launch(driverClientConfigurationProto);
- break;
+ return yarnDriverServiceLauncher.launch(driverClientConfigurationProto);
case LOCAL_RUNTIME:
final IDriverLauncher localDriverServiceLauncher = getLocalDriverServiceLauncher();
- localDriverServiceLauncher.launch(driverClientConfigurationProto);
- break;
+ return localDriverServiceLauncher.launch(driverClientConfigurationProto);
case AZBATCH_RUNTIME:
final IDriverLauncher azureBatchDriverServiceLauncher = getAzureBatchDriverServiceLauncher();
- azureBatchDriverServiceLauncher.launch(driverClientConfigurationProto);
- break;
+ return azureBatchDriverServiceLauncher.launch(driverClientConfigurationProto);
default:
+ throw new RuntimeException("Unknown runtime");
}
- LOG.log(Level.INFO, "JavaBridge: Stop Client {0}", driverClientConfigurationProto.getJobid());
- } catch (final BindException | InjectionException | IOException ex) {
+ } catch (final BindException | InjectionException ex) {
LOG.log(Level.SEVERE, "Job configuration error", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Main method that launches the REEF job.
+ *
+ * @param args command line parameters.
+ */
+ public static void main(final String[] args) throws InvalidProtocolBufferException {
+ if (args.length != 1) {
+ LOG.log(Level.SEVERE, DriverServiceLauncher.class.getName() +
+ " accepts single argument referencing a file that contains a client protocol buffer driver configuration");
+ }
+ final String content;
+ try {
+ content = new String(Files.readAllBytes(Paths.get(args[0])));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
+ final ClientProtocol.DriverClientConfiguration.Builder driverClientConfigurationProtoBuilder =
+ ClientProtocol.DriverClientConfiguration.newBuilder();
+ JsonFormat.parser()
+ .usingTypeRegistry(JsonFormat.TypeRegistry.getEmptyTypeRegistry())
+ .merge(content, driverClientConfigurationProtoBuilder);
+ final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto =
+ driverClientConfigurationProtoBuilder.build();
+ final LauncherStatus status = launch(driverClientConfigurationProto);
+ LOG.log(Level.INFO, "Status: " + status.toString());
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java
index ca3817b..9857ab9 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java
@@ -63,10 +63,12 @@ public final class DriverClientClock implements Clock, IAlarmDispatchHandler {
@Override
public Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler) {
+ LOG.log(Level.INFO, "Schedule alarm offset {0}", offset);
final ClientAlarm alarm = new ClientAlarm(this.timer.getCurrent() + offset, handler);
final String alarmId = UUID.randomUUID().toString();
this.alarmMap.put(alarmId, alarm);
this.driverServiceClient.onSetAlarm(alarmId, offset);
+ LOG.log(Level.INFO, "Alarm {0} scheduled at offset {1}", new Object[]{alarmId, offset});
return alarm;
}
@@ -117,6 +119,7 @@ public final class DriverClientClock implements Clock, IAlarmDispatchHandler {
*/
@Override
public void onNext(final String alarmId) {
+ LOG.log(Level.INFO, "Alarm {0} triggered", alarmId);
if (this.alarmMap.containsKey(alarmId)) {
final ClientAlarm clientAlarm = this.alarmMap.remove(alarmId);
clientAlarm.run();
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java
index 8c4eb28..ce66692 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java
@@ -50,6 +50,11 @@ import java.util.Set;
public final class DriverClientDispatcher {
/**
+ * Exception handler.
+ */
+ private final DriverClientExceptionHandler exceptionHandler;
+
+ /**
* Dispatcher used for application provided event handlers.
*/
private final DispatchingEStage applicationDispatcher;
@@ -79,6 +84,12 @@ public final class DriverClientDispatcher {
*/
private final DispatchingEStage driverRestartDispatcher;
+
+ /**
+ * Synchronous set of stop handlers.
+ */
+ private final Set<EventHandler<StopTime>> stopHandlers;
+
@Inject
private DriverClientDispatcher(
final DriverClientExceptionHandler driverExceptionHandler,
@@ -124,12 +135,12 @@ public final class DriverClientDispatcher {
final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers,
@Parameter(ClientMessageHandlers.class)
final Set<EventHandler<byte[]>> clientMessageHandlers) {
-
+ this.exceptionHandler = driverExceptionHandler;
this.applicationDispatcher = new DispatchingEStage(
driverExceptionHandler, numberOfThreads, "ClientDriverDispatcher");
// Application start and stop handlers
this.applicationDispatcher.register(StartTime.class, startHandlers);
- this.applicationDispatcher.register(StopTime.class, stopHandlers);
+ this.stopHandlers = stopHandlers; // must be called synchronously
// Application Context event handlers
this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers);
this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers);
@@ -276,8 +287,22 @@ public final class DriverClientDispatcher {
this.applicationDispatcher.onNext(StartTime.class, startTime);
}
- public void dispatch(final StopTime stopTime) {
- this.applicationDispatcher.onNext(StopTime.class, stopTime);
+ /**
+ * We must implement this synchronously in order to catch exceptions and
+ * forward them back via the bridge before the server shuts down, after
+ * this method returns.
+ * @param stopTime stop time
+ */
+ @SuppressWarnings("checkstyle:illegalCatch")
+ public Throwable dispatch(final StopTime stopTime) {
+ try {
+ for (final EventHandler<StopTime> handler : stopHandlers) {
+ handler.onNext(stopTime);
+ }
+ return null;
+ } catch (Throwable t) {
+ return t;
+ }
}
public void dispatch(final ActiveContext context) {
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java
index 54692ec..e1c01d6 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java
@@ -31,13 +31,19 @@ public final class DriverClientEvaluatorRequestor implements EvaluatorRequestor
private final IDriverServiceClient driverServiceClient;
+ private final IDriverClientService driverClientService;
+
@Inject
- private DriverClientEvaluatorRequestor(final IDriverServiceClient driverServiceClient) {
+ private DriverClientEvaluatorRequestor(
+ final IDriverServiceClient driverServiceClient,
+ final IDriverClientService driverClientService) {
this.driverServiceClient = driverServiceClient;
+ this.driverClientService = driverClientService;
}
@Override
public void submit(final EvaluatorRequest req) {
+ this.driverClientService.notifyEvaluatorRequest(req.getNumber());
this.driverServiceClient.onEvaluatorRequest(req);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java
index 9bd99d6..9e31b99 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java
@@ -20,6 +20,7 @@
package org.apache.reef.bridge.driver.client;
import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
import javax.inject.Inject;
import java.util.logging.Level;
@@ -31,13 +32,18 @@ import java.util.logging.Logger;
public final class DriverClientExceptionHandler implements EventHandler<Throwable> {
private static final Logger LOG = Logger.getLogger(DriverClientExceptionHandler.class.getName());
+ private final Clock clock;
+
@Inject
- private DriverClientExceptionHandler() {
+ private DriverClientExceptionHandler(final Clock clock) {
LOG.log(Level.FINE, "Instantiated 'DriverExceptionHandler'");
+ this.clock = clock;
}
@Override
public void onNext(final Throwable throwable) {
+ LOG.log(Level.SEVERE, throwable.toString());
+ this.clock.stop(throwable);
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java
index 38758bd..3fd9cd0 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java
@@ -38,6 +38,13 @@ public interface IDriverClientService {
/**
+ * Notify that the count number of evaluators have been
+ * requested by the application.
+ * @param count of the number of evaluators
+ */
+ void notifyEvaluatorRequest(final int count);
+
+ /**
* Wait for termination of driver client service.
*/
void awaitTermination() throws InterruptedException;
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java
index 7cc1346..1421666 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java
@@ -34,6 +34,8 @@ import java.util.List;
@DefaultImplementation(DriverServiceClient.class)
public interface IDriverServiceClient {
+ void onInitializationException(final Throwable ex);
+
/**
* Initiate shutdown.
*/
@@ -129,4 +131,11 @@ public interface IDriverServiceClient {
* @param message to send
*/
void onTaskMessage(final String taskId, final byte[] message);
+
+ /**
+ * Suspend a running task.
+ * @param taskId task identifier
+ * @param message optional message
+ */
+ void onSuspendTask(final String taskId, final Optional<byte[]> message);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java
index 3b675ea..964036f 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java
@@ -190,10 +190,24 @@ public final class JavaDriverClientLauncher {
Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.envConfig));
final Injector injector = Tang.Factory.getTang().newInjector(launcher.envConfig);
- try (final Clock reef = injector.getInstance(Clock.class)) {
- reef.run();
- } catch (final Throwable ex) {
- throw fatal("Unable to configure and start Clock.", ex);
+ try {
+ final IDriverServiceClient driverServiceClient = injector.getInstance(IDriverServiceClient.class);
+ try (final Clock reef = injector.getInstance(Clock.class)) {
+ reef.run();
+ } catch (final InjectionException ex) {
+ LOG.log(Level.SEVERE, "Unable to configure driver client.");
+ driverServiceClient.onInitializationException(ex.getCause() != null ? ex.getCause() : ex);
+ } catch (final Throwable t) {
+ if (t.getCause() != null && t.getCause() instanceof InjectionException) {
+ LOG.log(Level.SEVERE, "Unable to configure driver client.");
+ final InjectionException ex = (InjectionException) t.getCause();
+ driverServiceClient.onInitializationException(ex.getCause() != null ? ex.getCause() : ex);
+ } else {
+ throw fatal("Unable run clock.", t);
+ }
+ }
+ } catch (InjectionException e) {
+ throw fatal("Unable initialize driver service client.", e);
}
ThreadLogger.logThreads(LOG, Level.FINEST, "Threads running after Clock.close():");
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java
index 1c315bb..084d584 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java
@@ -40,7 +40,7 @@ public final class FailedContextBridge implements FailedContext {
private final Optional<ActiveContext> parentContext;
- private final Optional<byte[]> data;
+ private final Optional<Throwable> reason;
public FailedContextBridge(
final String contextId,
@@ -48,13 +48,13 @@ public final class FailedContextBridge implements FailedContext {
final String message,
final EvaluatorDescriptor evaluatorDescriptor,
final Optional<ActiveContext> parentContext,
- final Optional<byte[]> data) {
+ final Optional<Throwable> reason) {
this.contextId = contextId;
this.evaluatorId = evaluatorId;
this.message = message;
this.evaluatorDescriptor = evaluatorDescriptor;
this.parentContext = parentContext;
- this.data = data;
+ this.reason = reason;
}
@Override
@@ -74,12 +74,12 @@ public final class FailedContextBridge implements FailedContext {
@Override
public Optional<Throwable> getReason() {
- return Optional.<Throwable>of(new EvaluatorException(this.evaluatorId, this.message));
+ return this.reason;
}
@Override
public Optional<byte[]> getData() {
- return this.data;
+ return Optional.empty();
}
@Override
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java
index d6d3f5e..b537b71 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java
@@ -60,12 +60,12 @@ public final class RunningTaskBridge implements RunningTask {
@Override
public void suspend(final byte[] message) {
- throw new UnsupportedOperationException("Suspend task not supported");
+ this.driverServiceClient.onSuspendTask(this.taskId, Optional.of(message));
}
@Override
public void suspend() {
- throw new UnsupportedOperationException("Suspend task not supported");
+ this.driverServiceClient.onSuspendTask(this.taskId, Optional.<byte[]>empty());
}
@Override
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/SuspendedTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/SuspendedTaskBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/SuspendedTaskBridge.java
new file mode 100644
index 0000000..92b7993
--- /dev/null
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/SuspendedTaskBridge.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client.events;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.SuspendedTask;
+
+/**
+ * Suspended task bridge.
+ */
+public final class SuspendedTaskBridge implements SuspendedTask {
+
+ private final String taskId;
+
+ private final ActiveContext context;
+
+ private final byte[] result;
+
+ public SuspendedTaskBridge(final String taskId, final ActiveContext context, final byte[] result) {
+ this.taskId = taskId;
+ this.context = context;
+ this.result = result;
+ }
+
+ @Override
+ public ActiveContext getActiveContext() {
+ return this.context;
+ }
+
+ @Override
+ public byte[] get() {
+ return this.result;
+ }
+
+ @Override
+ public String getId() {
+ return this.taskId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java
index b646d6e..05fcd40 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java
@@ -20,6 +20,7 @@
package org.apache.reef.bridge.driver.client.grpc;
import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
@@ -39,6 +40,7 @@ import org.apache.reef.driver.restart.DriverRestarted;
import org.apache.reef.driver.task.FailedTask;
import org.apache.reef.exception.EvaluatorException;
import org.apache.reef.runtime.common.driver.evaluator.EvaluatorDescriptorImpl;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
import org.apache.reef.tang.InjectionFuture;
import org.apache.reef.util.Optional;
import org.apache.reef.wake.remote.ports.TcpPortProvider;
@@ -64,8 +66,12 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl
private Server server;
+ private final Object lock = new Object();
+
private final InjectionFuture<Clock> clock;
+ private final ExceptionCodec exceptionCodec;
+
private final DriverServiceClient driverServiceClient;
private final TcpPortProvider tcpPortProvider;
@@ -76,22 +82,28 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl
private final Map<String, ActiveContextBridge> activeContextBridgeMap = new HashMap<>();
- private boolean isIdle = false;
+ private int outstandingEvaluatorCount = 0;
@Inject
private DriverClientService(
+ final ExceptionCodec exceptionCodec,
final DriverServiceClient driverServiceClient,
final TcpPortProvider tcpPortProvider,
final InjectionFuture<Clock> clock,
final InjectionFuture<DriverClientDispatcher> clientDriverDispatcher) {
+ this.exceptionCodec = exceptionCodec;
this.driverServiceClient = driverServiceClient;
this.tcpPortProvider = tcpPortProvider;
this.clock = clock;
this.clientDriverDispatcher = clientDriverDispatcher;
}
- void setNotIdle() {
- this.isIdle = false;
+ @Override
+ public void notifyEvaluatorRequest(final int count) {
+ synchronized (this.lock) {
+ this.outstandingEvaluatorCount += count;
+ this.lock.notify();
+ }
}
@Override
@@ -123,21 +135,19 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl
@Override
public void idlenessCheckHandler(final Void request, final StreamObserver<IdleStatus> responseObserver) {
- if (clock.get().isIdle() && this.evaluatorBridgeMap.isEmpty()) {
+ if (isIdle()) {
LOG.log(Level.INFO, "possibly idle. waiting for some action.");
- this.isIdle = true;
try {
- Thread.sleep(120000); // a couple of minutes
+ synchronized (this.lock) {
+ this.lock.wait(1000); // wait a second
+ }
} catch (InterruptedException e) {
LOG.log(Level.WARNING, e.getMessage());
}
- } else {
- LOG.log(Level.INFO, "not idle");
- this.isIdle = false;
}
responseObserver.onNext(IdleStatus.newBuilder()
.setReason("DriverClient checking idleness")
- .setIsIdle(this.isIdle)
+ .setIsIdle(this.isIdle())
.build());
responseObserver.onCompleted();
}
@@ -155,13 +165,22 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl
}
@Override
- public void stopHandler(final StopTimeInfo request, final StreamObserver<Void> responseObserver) {
+ public void stopHandler(final StopTimeInfo request, final StreamObserver<ExceptionInfo> responseObserver) {
try {
LOG.log(Level.INFO, "StopHandler at time {0}", request.getStopTime());
final StopTime stopTime = new StopTime(request.getStopTime());
- this.clientDriverDispatcher.get().dispatch(stopTime);
+ final Throwable error = this.clientDriverDispatcher.get().dispatch(stopTime);
+ if (error != null) {
+ responseObserver.onNext(
+ ExceptionInfo.newBuilder()
+ .setName(error.getCause() != null ? error.getCause().toString() : error.toString())
+ .setMessage(error.getMessage() == null ? error.toString() : error.getMessage())
+ .setData(ByteString.copyFrom(exceptionCodec.toBytes(error)))
+ .build());
+ } else {
+ responseObserver.onNext(ExceptionInfo.newBuilder().setNoError(true).build());
+ }
} finally {
- responseObserver.onNext(null);
responseObserver.onCompleted();
this.server.shutdown();
}
@@ -181,7 +200,9 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl
@Override
public void allocatedEvaluatorHandler(final EvaluatorInfo request, final StreamObserver<Void> responseObserver) {
try {
- this.isIdle = false;
+ synchronized (this.lock) {
+ this.outstandingEvaluatorCount--;
+ }
LOG.log(Level.INFO, "Allocated evaluator id {0}", request.getEvaluatorId());
final AllocatedEvaluatorBridge eval = new AllocatedEvaluatorBridge(
request.getEvaluatorId(),
@@ -210,6 +231,15 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl
@Override
public void failedEvaluatorHandler(final EvaluatorInfo request, final StreamObserver<Void> responseObserver) {
try {
+ if (!this.evaluatorBridgeMap.containsKey(request.getEvaluatorId())) {
+ LOG.log(Level.INFO, "Failed evalautor that we were not allocated");
+ synchronized (this.lock) {
+ if (this.outstandingEvaluatorCount > 0) {
+ this.outstandingEvaluatorCount--;
+ }
+ }
+ return;
+ }
LOG.log(Level.INFO, "Failed Evaluator id {0}", request.getEvaluatorId());
final AllocatedEvaluatorBridge eval = this.evaluatorBridgeMap.remove(request.getEvaluatorId());
List<FailedContext> failedContextList = new ArrayList<>();
@@ -224,7 +254,7 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl
context.getParentId().isPresent() ?
Optional.<ActiveContext>of(this.activeContextBridgeMap.get(context.getParentId().get())) :
Optional.<ActiveContext>empty(),
- Optional.<byte[]>empty()));
+ Optional.<Throwable>empty()));
}
for (final String failedContextId : request.getFailure().getFailedContextsList()) {
this.activeContextBridgeMap.remove(failedContextId);
@@ -253,7 +283,6 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl
@Override
public void activeContextHandler(final ContextInfo request, final StreamObserver<Void> responseObserver) {
try {
- this.isIdle = false;
LOG.log(Level.INFO, "Active context id {0}", request.getContextId());
final AllocatedEvaluatorBridge eval = this.evaluatorBridgeMap.get(request.getEvaluatorId());
final ActiveContextBridge context = new ActiveContextBridge(
@@ -302,8 +331,9 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl
final Optional<ActiveContext> parent = context.getParentId().isPresent() ?
Optional.<ActiveContext>of(this.activeContextBridgeMap.get(context.getParentId().get())) :
Optional.<ActiveContext>empty();
- final Optional<byte[]> data = request.getException().getData() != null ?
- Optional.of(request.getException().getData().toByteArray()) : Optional.<byte[]>empty();
+ final Optional<Throwable> reason = !request.getException().getData().isEmpty() ?
+ this.exceptionCodec.fromBytes(request.getException().getData().toByteArray()) :
+ Optional.<Throwable>empty();
this.clientDriverDispatcher.get().dispatch(
new FailedContextBridge(
context.getId(),
@@ -311,7 +341,7 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl
request.getException().getMessage(),
context.getEvaluatorDescriptor(),
parent,
- data));
+ reason));
} finally {
responseObserver.onNext(null);
responseObserver.onCompleted();
@@ -374,15 +404,15 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl
this.activeContextBridgeMap.containsKey(request.getContext().getContextId()) ?
Optional.<ActiveContext>of(this.activeContextBridgeMap.get(request.getContext().getContextId())) :
Optional.<ActiveContext>empty();
- final Optional<byte[]> data = request.getException().getData() != null ?
- Optional.of(request.getException().getData().toByteArray()) : Optional.<byte[]>empty();
this.clientDriverDispatcher.get().dispatch(
new FailedTask(
request.getTaskId(),
request.getException().getMessage(),
Optional.of(request.getException().getName()),
- Optional.<Throwable>of(new EvaluatorException(request.getException().getMessage())),
- data,
+ request.getException().getData().isEmpty() ?
+ Optional.<Throwable>of(new EvaluatorException(request.getException().getMessage())) :
+ this.exceptionCodec.fromBytes(request.getException().getData().toByteArray()),
+ Optional.<byte[]>empty(),
context));
} finally {
responseObserver.onNext(null);
@@ -403,7 +433,8 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl
new CompletedTaskBridge(
request.getTaskId(),
context,
- request.getResult() != null ? request.getResult().toByteArray() : null));
+ request.getResult() != null && !request.getResult().isEmpty() ?
+ request.getResult().toByteArray() : null));
} finally {
responseObserver.onNext(null);
responseObserver.onCompleted();
@@ -412,7 +443,23 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl
@Override
public void suspendedTaskHandler(final TaskInfo request, final StreamObserver<Void> responseObserver) {
- responseObserver.onError(Status.INTERNAL.withDescription("Not supported").asRuntimeException());
+ final ContextInfo contextInfo = request.getContext();
+ if (!this.activeContextBridgeMap.containsKey(contextInfo.getContextId())) {
+ this.activeContextBridgeMap.put(contextInfo.getContextId(), toActiveContext(contextInfo));
+ }
+ LOG.log(Level.INFO, "Suspended task id {0}", request.getTaskId());
+ try {
+ final ActiveContextBridge context = this.activeContextBridgeMap.get(request.getContext().getContextId());
+ this.clientDriverDispatcher.get().dispatch(
+ new SuspendedTaskBridge(
+ request.getTaskId(),
+ context,
+ request.getResult() != null && !request.getResult().isEmpty() ?
+ request.getResult().toByteArray() : null));
+ } finally {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
}
@Override
@@ -589,6 +636,16 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl
}
// Helper methods
+ private boolean isIdle() {
+ LOG.log(Level.INFO, "Clock idle {0}, outstanding evaluators {1}, current evaluators {2}",
+ new Object[] {
+ this.clock.get().isIdle(),
+ this.outstandingEvaluatorCount,
+ this.evaluatorBridgeMap.isEmpty()});
+ return clock.get().isIdle() &&
+ this.outstandingEvaluatorCount == 0 &&
+ this.evaluatorBridgeMap.isEmpty();
+ }
private EvaluatorDescriptor toEvaluatorDescriptor(final EvaluatorDescriptorInfo info) {
return new EvaluatorDescriptorImpl(
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverServiceClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverServiceClient.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverServiceClient.java
index 81fb290..e89a5a5 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverServiceClient.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverServiceClient.java
@@ -27,10 +27,11 @@ import org.apache.reef.bridge.driver.client.IDriverServiceClient;
import org.apache.reef.bridge.driver.client.JVMClientProcess;
import org.apache.reef.bridge.driver.client.grpc.parameters.DriverServicePort;
import org.apache.reef.bridge.proto.*;
+import org.apache.reef.bridge.proto.Void;
import org.apache.reef.driver.context.ContextConfiguration;
import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.InjectionFuture;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.Optional;
@@ -38,6 +39,10 @@ import org.apache.reef.util.Optional;
import javax.inject.Inject;
import java.io.File;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* The client that exposes methods for communicating back to the
@@ -46,7 +51,7 @@ import java.util.List;
@Private
public final class DriverServiceClient implements IDriverServiceClient {
- private final InjectionFuture<DriverClientService> driverClientService;
+ private final ExceptionCodec exceptionCodec;
private final ConfigurationSerializer configurationSerializer;
@@ -54,11 +59,11 @@ public final class DriverServiceClient implements IDriverServiceClient {
@Inject
private DriverServiceClient(
- final InjectionFuture<DriverClientService> driverClientService,
final ConfigurationSerializer configurationSerializer,
+ final ExceptionCodec exceptionCodec,
@Parameter(DriverServicePort.class) final Integer driverServicePort) {
- this.driverClientService = driverClientService;
this.configurationSerializer = configurationSerializer;
+ this.exceptionCodec = exceptionCodec;
final ManagedChannel channel = ManagedChannelBuilder
.forAddress("localhost", driverServicePort)
.usePlaintext(true)
@@ -75,6 +80,23 @@ public final class DriverServiceClient implements IDriverServiceClient {
}
@Override
+ public void onInitializationException(final Throwable ex) {
+ final Future<Void> callComplete = this.serviceStub.registerDriverClient(
+ DriverClientRegistration.newBuilder()
+ .setException(ExceptionInfo.newBuilder()
+ .setName(ex.getCause() != null ? ex.getCause().toString() : ex.toString())
+ .setMessage(ex.getMessage() == null ? ex.toString() : ex.getMessage())
+ .setData(ByteString.copyFrom(exceptionCodec.toBytes(ex)))
+ .build())
+ .build());
+ try {
+ callComplete.get(5, TimeUnit.SECONDS);
+ } catch (ExecutionException | TimeoutException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
public void onShutdown() {
this.serviceStub.shutdown(ShutdownRequest.newBuilder().build());
}
@@ -84,14 +106,14 @@ public final class DriverServiceClient implements IDriverServiceClient {
this.serviceStub.shutdown(ShutdownRequest.newBuilder()
.setException(ExceptionInfo.newBuilder()
.setName(ex.getCause() != null ? ex.getCause().toString() : ex.toString())
- .setMessage(ex.getMessage())
+ .setMessage(ex.getMessage() == null ? ex.toString() : ex.getMessage())
+ .setData(ByteString.copyFrom(exceptionCodec.toBytes(ex)))
.build())
.build());
}
@Override
public void onSetAlarm(final String alarmId, final int timeoutMS) {
- this.driverClientService.get().setNotIdle();
this.serviceStub.setAlarm(
AlarmRequest.newBuilder()
.setAlarmId(alarmId)
@@ -101,7 +123,6 @@ public final class DriverServiceClient implements IDriverServiceClient {
@Override
public void onEvaluatorRequest(final EvaluatorRequest evaluatorRequest) {
- this.driverClientService.get().setNotIdle();
this.serviceStub.requestResources(
ResourceRequest.newBuilder()
.setCores(evaluatorRequest.getNumberOfCores())
@@ -215,18 +236,38 @@ public final class DriverServiceClient implements IDriverServiceClient {
@Override
public void onTaskClose(final String taskId, final Optional<byte[]> message) {
- this.serviceStub.runningTaskOp(RunningTaskRequest.newBuilder()
- .setTaskId(taskId)
- .setCloseTask(true)
- .setMessage(message.isPresent() ? ByteString.copyFrom(message.get()) : null)
- .build());
+ this.serviceStub.runningTaskOp(message.isPresent() ?
+ RunningTaskRequest.newBuilder()
+ .setTaskId(taskId)
+ .setOperation(RunningTaskRequest.Operation.CLOSE)
+ .setMessage(ByteString.copyFrom(message.get()))
+ .build() :
+ RunningTaskRequest.newBuilder()
+ .setTaskId(taskId)
+ .setOperation(RunningTaskRequest.Operation.CLOSE)
+ .build());
}
@Override
public void onTaskMessage(final String taskId, final byte[] message) {
this.serviceStub.runningTaskOp(RunningTaskRequest.newBuilder()
.setTaskId(taskId)
+ .setOperation(RunningTaskRequest.Operation.SEND_MESSAGE)
.setMessage(ByteString.copyFrom(message))
.build());
}
+
+ @Override
+ public void onSuspendTask(final String taskId, final Optional<byte[]> message) {
+ this.serviceStub.runningTaskOp(message.isPresent() ?
+ RunningTaskRequest.newBuilder()
+ .setTaskId(taskId)
+ .setOperation(RunningTaskRequest.Operation.SUSPEND)
+ .setMessage(ByteString.copyFrom(message.get()))
+ .build() :
+ RunningTaskRequest.newBuilder()
+ .setTaskId(taskId)
+ .setOperation(RunningTaskRequest.Operation.SUSPEND)
+ .build());
+ }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/IDriverLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/IDriverLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/IDriverLauncher.java
index c122552..9ccf1d8 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/IDriverLauncher.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/IDriverLauncher.java
@@ -19,6 +19,7 @@
package org.apache.reef.bridge.driver.launch;
import org.apache.reef.bridge.proto.ClientProtocol;
+import org.apache.reef.client.LauncherStatus;
/**
* All driver launchers implement this method.
@@ -29,5 +30,5 @@ public interface IDriverLauncher {
* Launch the driver with the dynamic {@link ClientProtocol.DriverClientConfiguration}.
* @param driverClientConfiguration dynamic driver configuration parameters
*/
- void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration);
+ LauncherStatus launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchLauncher.java
index d1b4557..e1235b8 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchLauncher.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchLauncher.java
@@ -21,16 +21,14 @@ package org.apache.reef.bridge.driver.launch.azbatch;
import org.apache.reef.bridge.driver.launch.IDriverLauncher;
import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider;
import org.apache.reef.bridge.proto.ClientProtocol;
-import org.apache.reef.client.REEF;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
import org.apache.reef.runtime.azbatch.client.AzureBatchRuntimeConfiguration;
import org.apache.reef.runtime.azbatch.client.AzureBatchRuntimeConfigurationCreator;
import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
import javax.inject.Inject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
/**
* This is a bootstrap launcher for Azure Batch for submission from C#. It allows for Java Driver
@@ -39,9 +37,6 @@ import java.util.logging.Logger;
*/
public final class AzureBatchLauncher implements IDriverLauncher {
- private static final Logger LOG = Logger.getLogger(AzureBatchLauncher.class.getName());
- private static final Tang TANG = Tang.Factory.getTang();
-
private final IDriverServiceConfigurationProvider driverServiceConfigurationProvider;
@Inject
@@ -49,17 +44,13 @@ public final class AzureBatchLauncher implements IDriverLauncher {
this.driverServiceConfigurationProvider = driverServiceConfigurationProvider;
}
- public void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) {
- try (final REEF reef = TANG.newInjector(
- generateConfigurationFromJobSubmissionParameters(driverClientConfiguration)).getInstance(REEF.class)) {
- LOG.log(Level.INFO, "Submitting job");
- reef.submit(driverServiceConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration));
+ public LauncherStatus launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) {
+ try {
+ return DriverLauncher.getLauncher(generateConfigurationFromJobSubmissionParameters(driverClientConfiguration))
+ .run(driverServiceConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration));
} catch (InjectionException e) {
- fatal("unable to launch", e);
+ throw new RuntimeException(e);
}
- LOG.log(Level.INFO, "Exiting BootstrapLauncher.main()");
-
- System.exit(0); // TODO[REEF-1715]: Should be able to exit cleanly at the end of main()
}
private static Configuration generateConfigurationFromJobSubmissionParameters(
@@ -83,9 +74,4 @@ public final class AzureBatchLauncher implements IDriverLauncher {
driverClientConfiguration.getAzbatchRuntime().getAzureStorageContainerName())
.build();
}
-
- private static RuntimeException fatal(final String msg, final Throwable t) {
- LOG.log(Level.SEVERE, msg, t);
- return new RuntimeException(msg, t);
- }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/LocalLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/LocalLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/LocalLauncher.java
index 153693c..4cdbfcb 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/LocalLauncher.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/LocalLauncher.java
@@ -23,6 +23,7 @@ import org.apache.reef.bridge.driver.launch.IDriverLauncher;
import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider;
import org.apache.reef.bridge.proto.ClientProtocol;
import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.ConfigurationModule;
@@ -45,7 +46,7 @@ public final class LocalLauncher implements IDriverLauncher {
this.driverServiceConfigurationProvider = driverServiceConfigurationProvider;
}
- public void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) {
+ public LauncherStatus launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) {
ConfigurationModule localRuntimeCM = LocalRuntimeConfiguration.CONF;
if (driverClientConfiguration.getLocalRuntime().getMaxNumberOfEvaluators() > 0) {
localRuntimeCM = localRuntimeCM.set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS,
@@ -64,7 +65,7 @@ public final class LocalLauncher implements IDriverLauncher {
driverClientConfiguration.getDriverJobSubmissionDirectory());
}
try {
- DriverLauncher
+ return DriverLauncher
.getLauncher(localRuntimeCM.build())
.run(driverServiceConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration));
} catch (InjectionException e) {
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/YarnLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/YarnLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/YarnLauncher.java
index 21f3989..810fad6 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/YarnLauncher.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/YarnLauncher.java
@@ -50,7 +50,7 @@ public final class YarnLauncher implements IDriverLauncher {
private YarnLauncher(){
}
- public void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) {
+ public LauncherStatus launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) {
try {
try {
final IDriverServiceConfigurationProvider driverConfigurationProvider =
@@ -77,6 +77,7 @@ public final class YarnLauncher implements IDriverLauncher {
LOG.log(Level.SEVERE, status.getError().get().getMessage());
status.getError().get().printStackTrace();
}
+ return status;
} catch (InjectionException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java
index eebc51c..b9690cc 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java
@@ -67,6 +67,7 @@ public final class DriverServiceConfiguration extends ConfigurationModuleBuilder
.set(DriverConfiguration.ON_TASK_RUNNING, DriverServiceHandlers.RunningTaskHandler.class)
.set(DriverConfiguration.ON_TASK_COMPLETED, DriverServiceHandlers.CompletedTaskHandler.class)
.set(DriverConfiguration.ON_TASK_FAILED, DriverServiceHandlers.FailedTaskHandler.class)
+ .set(DriverConfiguration.ON_TASK_SUSPENDED, DriverServiceHandlers.SuspendedTaskHandler.class)
.set(DriverConfiguration.ON_TASK_MESSAGE, DriverServiceHandlers.TaskMessageHandler.class)
.set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverServiceHandlers.ClientMessageHandler.class)
.set(DriverConfiguration.ON_CLIENT_CLOSED, DriverServiceHandlers.ClientCloseHandler.class)
http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java
index 7666695..5c2476b 100644
--- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java
+++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java
@@ -22,7 +22,6 @@ import com.google.protobuf.ByteString;
import io.grpc.*;
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.Validate;
import org.apache.reef.bridge.driver.service.DriverClientException;
import org.apache.reef.bridge.driver.service.IDriverService;
import org.apache.reef.bridge.service.parameters.DriverClientCommand;
@@ -39,9 +38,11 @@ import org.apache.reef.driver.task.*;
import org.apache.reef.runtime.common.driver.context.EvaluatorContext;
import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorImpl;
import org.apache.reef.runtime.common.driver.idle.IdleMessage;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.OSUtils;
+import org.apache.reef.util.Optional;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.ports.TcpPortProvider;
import org.apache.reef.wake.time.Clock;
@@ -53,9 +54,14 @@ import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -67,14 +73,18 @@ public final class GRPCDriverService implements IDriverService {
private static final Void VOID = Void.newBuilder().build();
- private Server server;
-
private Process driverProcess;
+ private enum StreamType { STDOUT, STDERR }
+
+ private Server server;
+
private DriverClientGrpc.DriverClientFutureStub clientStub;
private final Clock clock;
+ private final ExceptionCodec exceptionCodec;
+
private final ConfigurationSerializer configurationSerializer;
private final EvaluatorRequestor evaluatorRequestor;
@@ -103,8 +113,10 @@ public final class GRPCDriverService implements IDriverService {
final JVMProcessFactory jvmProcessFactory,
final CLRProcessFactory clrProcessFactory,
final TcpPortProvider tcpPortProvider,
+ final ExceptionCodec exceptionCodec,
@Parameter(DriverClientCommand.class) final String driverClientCommand) {
this.clock = clock;
+ this.exceptionCodec = exceptionCodec;
this.configurationSerializer = configurationSerializer;
this.jvmProcessFactory = jvmProcessFactory;
this.clrProcessFactory = clrProcessFactory;
@@ -130,12 +142,19 @@ public final class GRPCDriverService implements IDriverService {
throw new IOException("Unable to start gRPC server");
} else {
final String cmd = this.driverClientCommand + " " + this.server.getPort();
- final String cmdOs = OSUtils.isWindows() ? "cmd.exe /c \"" + cmd + "\"" : cmd;
+ final List<String> cmdOs = OSUtils.isWindows() ?
+ Arrays.asList("cmd.exe", "/c", cmd) : Arrays.asList("/bin/sh", "-c", cmd);
LOG.log(Level.INFO, "CMD: " + cmdOs);
- this.driverProcess = Runtime.getRuntime().exec(cmdOs, null, new File(System.getProperty("user.dir")));
+ this.driverProcess = new ProcessBuilder()
+ .command(cmdOs)
+ .redirectError(new File("driverclient.stderr"))
+ .redirectOutput(new File("driverclient.stdout"))
+ .directory(new File(System.getProperty("user.dir")))
+ .start();
synchronized (this) {
+ int attempts = 10; // give some time
// wait for driver client process to register
- while (this.clientStub == null && driverProcessIsAlive()) {
+ while (attempts-- > 0 && this.clientStub == null && driverProcessIsAlive()) {
LOG.log(Level.INFO, "waiting for driver process to register");
this.wait(1000); // a second
}
@@ -164,10 +183,12 @@ public final class GRPCDriverService implements IDriverService {
LOG.log(Level.INFO, "STOP: gRPC Driver Service", t);
if (!stopped) {
try {
- if (t != null) {
- clock.stop(t);
- } else {
- clock.stop();
+ if (!clock.isClosed()) {
+ if (t != null) {
+ clock.stop(t);
+ } else {
+ clock.stop();
+ }
}
if (server != null) {
LOG.log(Level.INFO, "Shutdown gRPC");
@@ -191,35 +212,40 @@ public final class GRPCDriverService implements IDriverService {
if (!driverProcessIsAlive()) {
LOG.log(Level.INFO, "Exit code: " + this.driverProcess.exitValue());
}
- LOG.log(Level.INFO, "capturing driver process stderr");
- StringBuffer outputBuffer = new StringBuffer();
+ dumpStream(StreamType.STDOUT);
+ dumpStream(StreamType.STDERR);
+ }
+
+ private void dumpStream(final StreamType type) {
+ StringBuffer buffer = new StringBuffer();
+
+ String name = "";
+ InputStream stream = null;
+ switch(type) {
+ case STDOUT:
+ name = "stdout";
+ stream = this.driverProcess.getInputStream();
+ break;
+ case STDERR:
+ name = "stderr";
+ stream = this.driverProcess.getErrorStream();
+ break;
+ default:
+ LOG.log(Level.WARNING, "Invalid stream type value");
+ }
+
+ LOG.log(Level.INFO, "capturing driver process " + name);
try {
int nextChar;
- final InputStream errStream = this.driverProcess.getErrorStream();
- outputBuffer.append("\nSTDERR =======================================\n");
- while ((nextChar = errStream.read()) != -1) {
- outputBuffer.append((char) nextChar);
- }
- outputBuffer.append("\n==============================================\n");
- final InputStream outStream = this.driverProcess.getInputStream();
- outputBuffer.append("\nSTDOUT =======================================\n");
- while ((nextChar = outStream.read()) != -1) {
- outputBuffer.append((char) nextChar);
- }
- outputBuffer.append("\n==============================================\n");
+ buffer.append("\n==============================================\n");
+ while ((nextChar = stream.read()) != -1) {
+ buffer.append((char) nextChar);
+ }
+ buffer.append("\n==============================================\n");
} catch (IOException e) {
LOG.log(Level.WARNING, "Error while capturing output stream: " + e.getMessage());
}
- LOG.log(Level.INFO, outputBuffer.toString());
- }
-
- /**
- * Await termination on the main thread since the grpc library uses daemon threads.
- */
- private void blockUntilShutdown() throws InterruptedException {
- if (server != null) {
- server.awaitTermination();
- }
+ LOG.log(Level.INFO, buffer.toString());
}
/**
@@ -292,16 +318,31 @@ public final class GRPCDriverService implements IDriverService {
@Override
public void stopHandler(final StopTime stopTime) {
synchronized (this) {
- try {
- if (clientStub != null) {
- this.clientStub.stopHandler(
- StopTimeInfo.newBuilder().setStopTime(stopTime.getTimestamp()).build());
+ if (clientStub != null) {
+ final Future<ExceptionInfo> callCompletion = this.clientStub.stopHandler(
+ StopTimeInfo.newBuilder().setStopTime(stopTime.getTimestamp()).build());
+ try {
+ try {
+ final ExceptionInfo error = callCompletion.get(5L, TimeUnit.MINUTES);
+ if (!error.getNoError()) {
+ final Optional<Throwable> t = parseException(error);
+ if (t.isPresent()) {
+ throw new RuntimeException("driver stop exception",
+ t.get().getCause() != null ? t.get().getCause() : t.get());
+ } else {
+ throw new RuntimeException(error.getMessage() != null ? error.getMessage() : error.getName());
+ }
+ }
+ } catch (TimeoutException e) {
+ throw new RuntimeException("stop handler timed out", e);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ } finally {
+ stop();
}
- } finally {
- stop();
}
}
-
}
@Override
@@ -344,7 +385,7 @@ public final class GRPCDriverService implements IDriverService {
.setEvaluatorId(context.getEvaluatorId())
.setParentId(
context.getParentId().isPresent() ?
- context.getParentId().get() : null)
+ context.getParentId().get() : "")
.setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
context.getEvaluatorDescriptor()))
.build());
@@ -369,17 +410,41 @@ public final class GRPCDriverService implements IDriverService {
@Override
public void failedContextHandler(final FailedContext context) {
synchronized (this) {
- this.activeContextMap.remove(context.getId());
- this.clientStub.closedContextHandler(
+ final ContextInfo.Builder contextInfoBuilder =
ContextInfo.newBuilder()
.setContextId(context.getId())
.setEvaluatorId(context.getEvaluatorId())
.setParentId(
context.getParentContext().isPresent() ?
- context.getParentContext().get().getId() : null)
+ context.getParentContext().get().getId() : "")
.setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
- context.getEvaluatorDescriptor()))
- .build());
+ context.getEvaluatorDescriptor()));
+ if (context.getReason().isPresent()) {
+ final Throwable reason = context.getReason().get();
+ contextInfoBuilder.setException(ExceptionInfo.newBuilder()
+ .setName(reason.toString())
+ .setMessage(context.getMessage() != null ? context.getMessage() : "")
+ .setData(ByteString.copyFrom(exceptionCodec.toBytes(reason)))
+ .build());
+ } else if (context.getData().isPresent()) {
+ contextInfoBuilder.setException(ExceptionInfo.newBuilder()
+ .setName(context.toString())
+ .setMessage(context.getDescription().isPresent() ?
+ context.getDescription().get() :
+ context.getMessage() != null ? context.getMessage() : "")
+ .setData(ByteString.copyFrom(context.getData().get()))
+ .build());
+ } else {
+ final Throwable reason = context.asError();
+ contextInfoBuilder.setException(ExceptionInfo.newBuilder()
+ .setName(reason.toString())
+ .setMessage(context.getMessage() != null ? context.getMessage() : "")
+ .setData(ByteString.copyFrom(exceptionCodec.toBytes(reason)))
+ .build());
+
+ }
+ this.activeContextMap.remove(context.getId());
+ this.clientStub.failedContextHandler(contextInfoBuilder.build());
}
}
@@ -425,18 +490,40 @@ public final class GRPCDriverService implements IDriverService {
!this.activeContextMap.containsKey(task.getActiveContext().get().getId())) {
this.activeContextMap.put(task.getActiveContext().get().getId(), task.getActiveContext().get());
}
+ final TaskInfo.Builder taskInfoBuilder = TaskInfo.newBuilder()
+ .setTaskId(task.getId());
+ if (task.getActiveContext().isPresent()) {
+ taskInfoBuilder.setContext(ContextInfo.newBuilder()
+ .setContextId(task.getActiveContext().get().getId())
+ .setEvaluatorId(task.getActiveContext().get().getEvaluatorId())
+ .setParentId(task.getActiveContext().get().getParentId().isPresent() ?
+ task.getActiveContext().get().getParentId().get() : "")
+ .build());
+ }
+ if (task.getReason().isPresent()) {
+ final Throwable reason = task.getReason().get();
+ taskInfoBuilder.setException(ExceptionInfo.newBuilder()
+ .setName(reason.toString())
+ .setMessage(task.getMessage() != null ? task.getMessage() : "")
+ .setData(ByteString.copyFrom(exceptionCodec.toBytes(reason)))
+ .build());
+ } else if (task.getData().isPresent()) {
+ final Throwable reason = task.asError();
+ taskInfoBuilder.setException(ExceptionInfo.newBuilder()
+ .setName(reason.toString())
+ .setMessage(task.getMessage() != null ? task.getMessage() : "")
+ .setData(ByteString.copyFrom(task.getData().get()))
+ .build());
+ } else {
+ final Throwable reason = task.asError();
+ taskInfoBuilder.setException(ExceptionInfo.newBuilder()
+ .setName(reason.toString())
+ .setMessage(task.getMessage() != null ? task.getMessage() : "")
+ .setData(ByteString.copyFrom(exceptionCodec.toBytes(reason)))
+ .build());
+ }
this.runningTaskMap.remove(task.getId());
- this.clientStub.failedTaskHandler(
- TaskInfo.newBuilder()
- .setTaskId(task.getId())
- .setContext(task.getActiveContext().isPresent() ?
- ContextInfo.newBuilder()
- .setContextId(task.getActiveContext().get().getId())
- .setEvaluatorId(task.getActiveContext().get().getEvaluatorId())
- .setParentId(task.getActiveContext().get().getParentId().isPresent() ?
- task.getActiveContext().get().getParentId().get() : "")
- .build() : null)
- .build());
+ this.clientStub.failedTaskHandler(taskInfoBuilder.build());
}
}
@@ -478,6 +565,8 @@ public final class GRPCDriverService implements IDriverService {
.setParentId(task.getActiveContext().getParentId().isPresent() ?
task.getActiveContext().getParentId().get() : "")
.build())
+ .setResult(task.get() == null || task.get().length == 0 ?
+ null : ByteString.copyFrom(task.get()))
.build());
}
}
@@ -606,6 +695,14 @@ public final class GRPCDriverService implements IDriverService {
}
}
+ private Optional<Throwable> parseException(final ExceptionInfo info) {
+ if (info.getData() == null || info.getData().isEmpty()) {
+ return Optional.empty();
+ } else {
+ return exceptionCodec.fromBytes(info.getData().toByteArray());
+ }
+ }
+
private final class DriverBridgeServiceImpl
extends DriverServiceGrpc.DriverServiceImplBase {
@@ -613,16 +710,31 @@ public final class GRPCDriverService implements IDriverService {
public void registerDriverClient(
final DriverClientRegistration request,
final StreamObserver<Void> responseObserver) {
+ LOG.log(Level.INFO, "driver client register");
try {
- final ManagedChannel channel = ManagedChannelBuilder
- .forAddress(request.getHost(), request.getPort())
- .usePlaintext(true)
- .build();
- synchronized (GRPCDriverService.this) {
- GRPCDriverService.this.clientStub = DriverClientGrpc.newFutureStub(channel);
- GRPCDriverService.this.notifyAll();
+ if (request.hasException()) {
+ LOG.log(Level.SEVERE, "Driver client initialization exception");
+ final Optional<Throwable> ex = parseException(request.getException());
+ if (ex.isPresent()) {
+ GRPCDriverService.this.clock.stop(ex.get());
+ } else {
+ GRPCDriverService.this.clock.stop(new RuntimeException(
+ request.getException().getMessage() == null ?
+ request.getException().getName() :
+ request.getException().getMessage()
+ ));
+ }
+ } else {
+ final ManagedChannel channel = ManagedChannelBuilder
+ .forAddress(request.getHost(), request.getPort())
+ .usePlaintext(true)
+ .build();
+ synchronized (GRPCDriverService.this) {
+ GRPCDriverService.this.clientStub = DriverClientGrpc.newFutureStub(channel);
+ GRPCDriverService.this.notifyAll();
+ }
+ LOG.log(Level.INFO, "Driver has registered on port " + request.getPort());
}
- LOG.log(Level.INFO, "Driver has registered on port " + request.getPort());
} finally {
responseObserver.onNext(null);
responseObserver.onCompleted();
@@ -662,13 +774,20 @@ public final class GRPCDriverService implements IDriverService {
final ShutdownRequest request,
final StreamObserver<Void> responseObserver) {
try {
- synchronized (GRPCDriverService.this) {
- if (request.getException() != null) {
+ LOG.log(Level.INFO, "driver shutdown");
+ if (request.hasException()) {
+ final Optional<Throwable> exception = parseException(request.getException());
+ if (exception.isPresent()) {
+ LOG.log(Level.INFO, "driver exception: " + exception.get().toString());
+ GRPCDriverService.this.clock.stop(exception.get());
+ } else {
+ // exception that cannot be parsed in java
GRPCDriverService.this.clock.stop(
new DriverClientException(request.getException().getMessage()));
- } else {
- GRPCDriverService.this.clock.stop();
}
+ } else {
+ LOG.log(Level.INFO, "clean shutdown");
+ GRPCDriverService.this.clock.stop();
}
} finally {
responseObserver.onNext(null);
@@ -681,17 +800,23 @@ public final class GRPCDriverService implements IDriverService {
final AlarmRequest request,
final StreamObserver<Void> responseObserver) {
try {
- synchronized (GRPCDriverService.this) {
- GRPCDriverService.this.clock.scheduleAlarm(request.getTimeoutMs(), new EventHandler<Alarm>() {
- @Override
- public void onNext(final Alarm value) {
- synchronized (GRPCDriverService.this) {
- GRPCDriverService.this.clientStub.alarmTrigger(
- AlarmTriggerInfo.newBuilder().setAlarmId(request.getAlarmId()).build());
- }
+ // do not synchronize when scheduling an alarm (or deadlock)
+ LOG.log(Level.INFO, "Set alarm {0} offset {1}",
+ new Object[] {request.getAlarmId(), request.getTimeoutMs()});
+ LOG.log(Level.INFO, "Alarm class " + GRPCDriverService.this.clock.getClass());
+ GRPCDriverService.this.clock.scheduleAlarm(request.getTimeoutMs(), new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm value) {
+ LOG.log(Level.INFO, "Trigger alarm {0}", request.getAlarmId());
+ synchronized (GRPCDriverService.this) {
+ GRPCDriverService.this.clientStub.alarmTrigger(
+ AlarmTriggerInfo.newBuilder().setAlarmId(request.getAlarmId()).build());
+ LOG.log(Level.INFO, "DONE: trigger alarm {0}", request.getAlarmId());
}
- });
- }
+ }
+ });
+ LOG.log(Level.INFO, "Alarm {0} scheduled is idle? {1}",
+ new Object[] {request.getAlarmId(), clock.isIdle()});
} finally {
responseObserver.onNext(null);
responseObserver.onCompleted();
@@ -750,10 +875,9 @@ public final class GRPCDriverService implements IDriverService {
if (StringUtils.isEmpty(request.getEvaluatorConfiguration())) {
// Assume that we are running Java driver client, but this assumption could be a bug so log a warning
LOG.log(Level.WARNING, "No evaluator configuration detected. Assuming a Java driver client.");
- if (request.getContextConfiguration() != null && request.getTaskConfiguration() != null) {
+ if (StringUtils.isNotEmpty(request.getContextConfiguration()) &&
+ StringUtils.isNotEmpty(request.getTaskConfiguration())) {
// submit context and task
- Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set");
- Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set");
try {
evaluator.submitContextAndTask(
configurationSerializer.fromString(request.getContextConfiguration()),
@@ -761,17 +885,15 @@ public final class GRPCDriverService implements IDriverService {
} catch (IOException e) {
throw new RuntimeException(e);
}
- } else if (request.getContextConfiguration() != null) {
+ } else if (StringUtils.isNotEmpty(request.getContextConfiguration())) {
// submit context
- Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set");
try {
evaluator.submitContext(configurationSerializer.fromString(request.getContextConfiguration()));
} catch (IOException e) {
throw new RuntimeException(e);
}
- } else if (request.getTaskConfiguration() != null) {
+ } else if (StringUtils.isNotEmpty(request.getTaskConfiguration())) {
// submit task
- Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set");
try {
evaluator.submitTask(configurationSerializer.fromString(request.getTaskConfiguration()));
} catch (IOException e) {
@@ -781,26 +903,20 @@ public final class GRPCDriverService implements IDriverService {
throw new RuntimeException("Missing check for required evaluator configurations");
}
} else {
- if (request.getContextConfiguration() != null && request.getTaskConfiguration() != null) {
+ if (StringUtils.isNotEmpty(request.getContextConfiguration()) &&
+ StringUtils.isNotEmpty(request.getTaskConfiguration())) {
// submit context and task
- Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set");
- Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set");
- Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set");
((AllocatedEvaluatorImpl) evaluator).submitContextAndTask(
request.getEvaluatorConfiguration(),
request.getContextConfiguration(),
request.getTaskConfiguration());
- } else if (request.getContextConfiguration() != null) {
+ } else if (StringUtils.isNotEmpty(request.getContextConfiguration())) {
// submit context
- Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set");
- Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set");
((AllocatedEvaluatorImpl) evaluator).submitContext(
request.getEvaluatorConfiguration(),
request.getContextConfiguration());
- } else if (request.getTaskConfiguration() != null) {
+ } else if (StringUtils.isNotEmpty(request.getTaskConfiguration())) {
// submit task
- Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set");
- Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set");
((AllocatedEvaluatorImpl) evaluator).submitTask(
request.getEvaluatorConfiguration(),
request.getTaskConfiguration());
@@ -884,23 +1000,40 @@ public final class GRPCDriverService implements IDriverService {
final StreamObserver<Void> responseObserver) {
synchronized (GRPCDriverService.this) {
if (!GRPCDriverService.this.runningTaskMap.containsKey(request.getTaskId())) {
+ LOG.log(Level.WARNING, "Unknown task id {0}", request.getTaskId());
responseObserver.onError(Status.INTERNAL
.withDescription("Task does not exist with id " + request.getTaskId()).asRuntimeException());
- }
- try {
- final RunningTask task = GRPCDriverService.this.runningTaskMap.get(request.getTaskId());
- if (request.getCloseTask()) {
- if (request.getMessage() != null) {
- task.close(request.getMessage().toByteArray());
- } else {
- task.close();
+ } else {
+ try {
+ final RunningTask task = GRPCDriverService.this.runningTaskMap.get(request.getTaskId());
+ switch (request.getOperation()) {
+ case CLOSE:
+ LOG.log(Level.INFO, "close task {0}", task.getId());
+ if (request.getMessage().isEmpty()) {
+ task.close();
+ } else {
+ task.close(request.getMessage().toByteArray());
+ }
+ break;
+ case SUSPEND:
+ LOG.log(Level.INFO, "suspend task {0}", task.getId());
+ if (request.getMessage().isEmpty()) {
+ task.suspend();
+ } else {
+ task.suspend(request.getMessage().toByteArray());
+ }
+ break;
+ case SEND_MESSAGE:
+ LOG.log(Level.INFO, "send message to task {0}", task.getId());
+ task.send(request.getMessage().toByteArray());
+ break;
+ default:
+ throw new RuntimeException("Unknown operation " + request.getOperation());
}
- } else if (request.getMessage() != null) {
- task.send(request.getMessage().toByteArray());
+ responseObserver.onNext(null);
+ } finally {
+ responseObserver.onCompleted();
}
- } finally {
- responseObserver.onNext(null);
- responseObserver.onCompleted();
}
}
}