You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by mo...@apache.org on 2018/03/28 19:05:18 UTC
[1/4] reef git commit: [REEF-1965] Implement REEF runtime for Azure
Batch
Repository: reef
Updated Branches:
refs/heads/master 7924188ca -> 561a336f2
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/test/java/org/apache/reef/runtime/azbatch/CommandBuilderTests.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/test/java/org/apache/reef/runtime/azbatch/CommandBuilderTests.java b/lang/java/reef-runtime-azbatch/src/test/java/org/apache/reef/runtime/azbatch/CommandBuilderTests.java
new file mode 100644
index 0000000..230cca3
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/test/java/org/apache/reef/runtime/azbatch/CommandBuilderTests.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch;
+
+import org.apache.reef.runtime.azbatch.util.command.LinuxCommandBuilder;
+import org.apache.reef.runtime.azbatch.util.command.WindowsCommandBuilder;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.Optional;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for the CommandBuilder functions.
+ */
+public final class CommandBuilderTests {
+
+ private Injector injector;
+ private LinuxCommandBuilder linuxCommandBuilder;
+ private WindowsCommandBuilder windowsCommandBuilder;
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setUp() throws InjectionException {
+ this.injector = Tang.Factory.getTang().newInjector();
+ RuntimeClasspathProvider classpathProvider = mock(RuntimeClasspathProvider.class);
+ when(classpathProvider.getDriverClasspathPrefix()).thenReturn(Arrays.asList("c:\\driverpath1", "c:\\driverpath2"));
+ when(classpathProvider.getEvaluatorClasspathPrefix())
+ .thenReturn(Arrays.asList("c:\\evaluatorpath1", "c:\\evaluatorpath2"));
+ when(classpathProvider.getDriverClasspathSuffix()).thenReturn(Arrays.asList("driverclasspathsuffix"));
+ when(classpathProvider.getEvaluatorClasspathSuffix()).thenReturn(Arrays.asList("evaluatorclasspathsuffix"));
+ this.injector
+ .bindVolatileInstance(RuntimeClasspathProvider.class, classpathProvider);
+ this.linuxCommandBuilder = this.injector.getInstance(LinuxCommandBuilder.class);
+ this.windowsCommandBuilder = this.injector.getInstance(WindowsCommandBuilder.class);
+
+ }
+
+ @Test
+ public void linuxCommandBuilderDriverTest() {
+ JobSubmissionEvent event = mock(JobSubmissionEvent.class);
+
+ Optional<Integer> memory = Optional.of(100);
+ when(event.getDriverMemory()).thenReturn(memory);
+
+ String actual = this.linuxCommandBuilder.buildDriverCommand(event);
+ String expected =
+ "/bin/sh -c \"unzip local.jar -d 'reef/'; {{JAVA_HOME}}/bin/java -Xmx100m -XX:PermSize=128m " +
+ "-XX:MaxPermSize=128m -ea -classpath " +
+ "c:\\driverpath1:c:\\driverpath2:reef/local/*:reef/global/*:driverclasspathsuffix " +
+ "-Dproc_reef org.apache.reef.runtime.common.REEFLauncher reef/local/driver.conf\"";
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void windowsCommandBuilderDriverTest() {
+ JobSubmissionEvent event = mock(JobSubmissionEvent.class);
+
+ Optional<Integer> memory = Optional.of(100);
+ when(event.getDriverMemory()).thenReturn(memory);
+
+ String actual = this.windowsCommandBuilder.buildDriverCommand(event);
+ String expected = "powershell.exe /c \"Add-Type -AssemblyName System.IO.Compression.FileSystem; " +
+ "[System.IO.Compression.ZipFile]::ExtractToDirectory(\\\"$env:AZ_BATCH_TASK_WORKING_DIR\\local.jar\\\", " +
+ "\\\"$env:AZ_BATCH_TASK_WORKING_DIR\\reef\\\"); {{JAVA_HOME}}/bin/java -Xmx100m -XX:PermSize=128m " +
+ "-XX:MaxPermSize=128m -ea -classpath " +
+ "'c:\\driverpath1;c:\\driverpath2;reef/local/*;reef/global/*;driverclasspathsuffix' " +
+ "-Dproc_reef org.apache.reef.runtime.common.REEFLauncher reef/local/driver.conf\";";
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void linuxCommandBuilderShimEvaluatorTest() {
+ String actual = this.linuxCommandBuilder.buildEvaluatorShimCommand(1, "conf");
+ String expected = "/bin/sh -c \"unzip local.jar -d 'reef/'; {{JAVA_HOME}}/bin/java -Xmx1m " +
+ "-XX:PermSize=128m -XX:MaxPermSize=128m -ea " +
+ "-classpath c:\\evaluatorpath1:c:\\evaluatorpath2:reef/local/*:reef/global/*:evaluatorclasspathsuffix " +
+ "-Dproc_reef org.apache.reef.runtime.azbatch.evaluator.EvaluatorShimLauncher conf\"";
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void windowsCommandBuilderShimEvaluatorTest() {
+ String actual = this.windowsCommandBuilder.buildEvaluatorShimCommand(1, "conf");
+ String expected = "powershell.exe /c \"Add-Type -AssemblyName System.IO.Compression.FileSystem; " +
+ "[System.IO.Compression.ZipFile]::ExtractToDirectory(\\\"$env:AZ_BATCH_TASK_WORKING_DIR\\local.jar\\\", " +
+ "\\\"$env:AZ_BATCH_TASK_WORKING_DIR\\reef\\\"); {{JAVA_HOME}}/bin/java -Xmx1m -XX:PermSize=128m " +
+ "-XX:MaxPermSize=128m -ea -classpath " +
+ "'c:\\evaluatorpath1;c:\\evaluatorpath2;reef/local/*;reef/global/*;evaluatorclasspathsuffix' -Dproc_reef " +
+ "org.apache.reef.runtime.azbatch.evaluator.EvaluatorShimLauncher conf\";";
+ Assert.assertEquals(expected, actual);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/test/java/org/apache/reef/runtime/azbatch/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/test/java/org/apache/reef/runtime/azbatch/package-info.java b/lang/java/reef-runtime-azbatch/src/test/java/org/apache/reef/runtime/azbatch/package-info.java
new file mode 100644
index 0000000..13a5caa
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/test/java/org/apache/reef/runtime/azbatch/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Unit tests for REEF Driver running under Azure Batch runtime.
+ */
+package org.apache.reef.runtime.azbatch;
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-tests/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/pom.xml b/lang/java/reef-tests/pom.xml
index beefd5f..b096e39 100644
--- a/lang/java/reef-tests/pom.xml
+++ b/lang/java/reef-tests/pom.xml
@@ -44,6 +44,11 @@ under the License.
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>reef-runtime-azbatch</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>reef-runtime-local</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AzureBatchTestEnvironment.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AzureBatchTestEnvironment.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AzureBatchTestEnvironment.java
new file mode 100644
index 0000000..7076dc9
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AzureBatchTestEnvironment.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests;
+
+import org.apache.reef.runtime.azbatch.client.AzureBatchRuntimeConfiguration;
+import org.apache.reef.runtime.azbatch.client.AzureBatchRuntimeConfigurationProvider;
+import org.apache.reef.runtime.azbatch.driver.RuntimeIdentifier;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.io.IOException;
+
+/**
+ * A TestEnvironment for the Azure Batch resourcemanager.
+ */
+public final class AzureBatchTestEnvironment extends TestEnvironmentBase implements TestEnvironment {
+
+ // Used to make sure the tests call the methods in the right order.
+ private boolean ready = false;
+
+ @Override
+ public synchronized void setUp() {
+ this.ready = true;
+ }
+
+ @Override
+ public synchronized Configuration getRuntimeConfiguration() {
+ assert this.ready;
+ try {
+ Configuration userConfiguration = AzureBatchRuntimeConfiguration.fromEnvironment();
+ final Injector injector = Tang.Factory.getTang().newInjector(userConfiguration);
+ final AzureBatchRuntimeConfigurationProvider runtimeConfigurationProvider =
+ injector.getInstance(AzureBatchRuntimeConfigurationProvider.class);
+ return runtimeConfigurationProvider.getAzureBatchRuntimeConfiguration();
+ } catch (IOException | InjectionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public synchronized void tearDown() {
+ assert this.ready;
+ this.ready = false;
+ }
+
+ @Override
+ public int getTestTimeout() {
+ return 5 * 60000; // 5 min.
+ }
+
+ @Override
+ public String getRuntimeName() {
+ return RuntimeIdentifier.RUNTIME_NAME;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-tests/src/test/java/org/apache/reef/tests/TestEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/TestEnvironmentFactory.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/TestEnvironmentFactory.java
index ab1454b..f716bec 100644
--- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/TestEnvironmentFactory.java
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/TestEnvironmentFactory.java
@@ -37,8 +37,9 @@ public final class TestEnvironmentFactory {
public static TestEnvironment getNewTestEnvironment() {
final boolean isYarn = Boolean.parseBoolean(System.getenv("REEF_TEST_YARN"));
final boolean isMesos = Boolean.parseBoolean(System.getenv("REEF_TEST_MESOS"));
+ final boolean isAzBatch = Boolean.parseBoolean(System.getenv("REEF_TEST_AZBATCH"));
- if (isYarn && isMesos) {
+ if (isYarn ? (isMesos || isAzBatch) : (isMesos && isAzBatch)) {
throw new RuntimeException("Cannot test on two runtimes at once");
} else if (isYarn) {
LOG.log(Level.INFO, "Running tests on YARN");
@@ -46,6 +47,9 @@ public final class TestEnvironmentFactory {
} else if (isMesos) {
LOG.log(Level.INFO, "Running tests on Mesos");
return new MesosTestEnvironment();
+ } else if (isAzBatch) {
+ LOG.log(Level.INFO, "Running tests on Azure Batch");
+ return new AzureBatchTestEnvironment();
} else {
LOG.log(Level.INFO, "Running tests on Local");
return new LocalTestEnvironment();
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7d4431e..8d58a93 100644
--- a/pom.xml
+++ b/pom.xml
@@ -753,6 +753,13 @@ under the License.
<artifactId>mesos</artifactId>
<version>0.25.0</version>
</dependency>
+
+ <!-- Microsoft Azure Batch APIs -->
+ <dependency>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-batch</artifactId>
+ <version>3.0.0</version>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -770,6 +777,7 @@ under the License.
<module>lang/java/reef-experimental</module>
<module>lang/java/reef-io</module>
<module>lang/java/reef-poison</module>
+ <module>lang/java/reef-runtime-azbatch</module>
<module>lang/java/reef-runtime-hdinsight</module>
<module>lang/java/reef-runtime-local</module>
<module>lang/java/reef-runtime-yarn</module>
[4/4] reef git commit: [REEF-1965] Implement REEF runtime for Azure
Batch
Posted by mo...@apache.org.
[REEF-1965] Implement REEF runtime for Azure Batch
* Implement REEF runtime for Azure Batch
* Add HelloReefAzBatch example that allows users to test the runtime
with their Batch account
* Ensure all tests in reef-tests package pass when run with Azure
Batch runtime.
* Azure Batch runtime security improvements and minor refactor
1. Driver now uses authentication token to communicate with Azure Batch
2. Driver now uses a Shared Access Signature to communicate with Azure Storage
3. org.apache.reef.runtime.azbatch.util package was refactored into sub-packages to better organize a growing number of classes.
4. Adding "final" keyword for necessary classes
5. Add missing annotation
* Fix race condition in AzureBatchTaskStatusAlarmHandler
JIRA: [REEF-1965](https://issues.apache.org/jira/browse/REEF-1965)
Closes #1432
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/561a336f
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/561a336f
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/561a336f
Branch: refs/heads/master
Commit: 561a336f2f0dda8f4a67a96179750a76167b038f
Parents: 7924188
Author: Reed Umbrasas <ry...@microsoft.com>
Authored: Wed Feb 28 17:29:44 2018 -0800
Committer: Sergiy Matusevych <mo...@apache.org>
Committed: Wed Mar 28 12:04:55 2018 -0700
----------------------------------------------------------------------
bin/runazuretests.sh | 52 +++
lang/common/proto/evaluator_shim.proto | 46 ++
lang/common/proto/reef_protocol.proto | 6 +
lang/java/reef-common/pom.xml | 1 +
.../reef/runtime/common/files/JobJarMaker.java | 115 +++--
.../runtime/common/launch/REEFMessageCodec.java | 9 +
lang/java/reef-examples/pom.xml | 5 +
.../reef/examples/hello/HelloReefAzBatch.java | 93 ++++
lang/java/reef-runtime-azbatch/README.md | 11 +
lang/java/reef-runtime-azbatch/pom.xml | 120 +++++
.../azbatch/AzureBatchClasspathProvider.java | 62 +++
.../azbatch/AzureBatchJVMPathProvider.java | 45 ++
...ureBatchDriverConfigurationProviderImpl.java | 100 ++++
.../client/AzureBatchJobSubmissionHandler.java | 155 +++++++
.../client/AzureBatchRuntimeConfiguration.java | 118 +++++
.../AzureBatchRuntimeConfigurationCreator.java | 84 ++++
.../AzureBatchRuntimeConfigurationProvider.java | 78 ++++
.../AzureBatchRuntimeConfigurationStatic.java | 53 +++
.../runtime/azbatch/client/package-info.java | 22 +
.../driver/AzureBatchDriverConfiguration.java | 129 ++++++
...BatchEvaluatorShimConfigurationProvider.java | 54 +++
.../driver/AzureBatchEvaluatorShimManager.java | 452 +++++++++++++++++++
.../driver/AzureBatchResourceLaunchHandler.java | 55 +++
.../driver/AzureBatchResourceManager.java | 136 ++++++
.../AzureBatchResourceManagerStartHandler.java | 47 ++
.../AzureBatchResourceManagerStopHandler.java | 51 +++
.../AzureBatchResourceReleaseHandler.java | 51 +++
.../AzureBatchResourceRequestHandler.java | 52 +++
.../AzureBatchTaskStatusAlarmHandler.java | 152 +++++++
.../azbatch/driver/REEFEventHandlers.java | 93 ++++
.../azbatch/driver/RuntimeIdentifier.java | 36 ++
.../runtime/azbatch/driver/package-info.java | 22 +
.../azbatch/evaluator/EvaluatorShim.java | 303 +++++++++++++
.../evaluator/EvaluatorShimConfiguration.java | 53 +++
.../evaluator/EvaluatorShimLauncher.java | 109 +++++
.../runtime/azbatch/evaluator/package-info.java | 22 +
.../reef/runtime/azbatch/package-info.java | 22 +
.../parameters/AzureBatchAccountKey.java | 29 ++
.../parameters/AzureBatchAccountName.java | 29 ++
.../parameters/AzureBatchAccountUri.java | 29 ++
.../azbatch/parameters/AzureBatchPoolId.java | 29 ++
.../AzureBatchTaskStatusCheckPeriod.java | 29 ++
.../parameters/AzureStorageAccountKey.java | 29 ++
.../parameters/AzureStorageAccountName.java | 29 ++
.../AzureStorageBlobSASTokenValidityHours.java | 29 ++
.../parameters/AzureStorageContainerName.java | 29 ++
.../azbatch/parameters/ContainerIdentifier.java | 29 ++
.../parameters/EvaluatorShimConfigFilePath.java | 29 ++
.../runtime/azbatch/parameters/IsWindows.java | 29 ++
.../azbatch/parameters/package-info.java | 22 +
.../azbatch/util/AzureBatchFileNames.java | 96 ++++
.../azbatch/util/RemoteIdentifierParser.java | 59 +++
.../runtime/azbatch/util/TaskStatusMapper.java | 57 +++
.../azbatch/util/batch/AzureBatchHelper.java | 172 +++++++
.../batch/IAzureBatchCredentialProvider.java | 33 ++
.../batch/SharedKeyBatchCredentialProvider.java | 59 +++
.../batch/TokenBatchCredentialProvider.java | 69 +++
.../azbatch/util/batch/package-info.java | 22 +
.../util/command/AbstractCommandBuilder.java | 120 +++++
.../azbatch/util/command/CommandBuilder.java | 56 +++
.../util/command/LinuxCommandBuilder.java | 68 +++
.../util/command/WindowsCommandBuilder.java | 74 +++
.../azbatch/util/command/package-info.java | 22 +
.../reef/runtime/azbatch/util/package-info.java | 22 +
.../util/storage/AzureStorageClient.java | 142 ++++++
.../util/storage/ICloudBlobClientProvider.java | 51 +++
...dAccessSignatureCloudBlobClientProvider.java | 110 +++++
.../storage/StorageKeyCloudBlobProvider.java | 92 ++++
.../azbatch/util/storage/package-info.java | 22 +
.../runtime/azbatch/CommandBuilderTests.java | 118 +++++
.../reef/runtime/azbatch/package-info.java | 22 +
lang/java/reef-tests/pom.xml | 5 +
.../reef/tests/AzureBatchTestEnvironment.java | 73 +++
.../reef/tests/TestEnvironmentFactory.java | 6 +-
pom.xml | 8 +
75 files changed, 4932 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/bin/runazuretests.sh
----------------------------------------------------------------------
diff --git a/bin/runazuretests.sh b/bin/runazuretests.sh
new file mode 100644
index 0000000..8553bef
--- /dev/null
+++ b/bin/runazuretests.sh
@@ -0,0 +1,52 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# EXAMPLE USAGE
+# . ./bin/runazuretests.sh
+# "./lang/java/reef-examples/target/reef-examples-0.17.0-SNAPSHOT-shaded.jar;
+# ./lang/java/reef-tests/target/reef-tests-0.17.0-SNAPSHOT-test-jar-with-dependencies.jar"
+# org.apache.reef.tests.examples.TestHelloREEF
+
+
+# RUNTIME
+
+if [ $# -ne 2 ];
+then
+ echo "Only 2 arguments are accepted - CLASSPATH and TESTCLASS"
+ exit 1;
+fi
+
+[ -z "$VARIABLE" ] && VARIABLE="REEF_TEST_AZBATCH"
+
+if [ $VARIABLE != "true" ]
+then
+ echo "Trying to set REEF_TEST_AZBATCH environment variable."
+ echo "Please run as \". runazuretests.sh\" or set it from your environment."
+ export REEF_TEST_AZBATCH=true
+fi
+
+CLASSPATH=$1
+TESTCLASS=$2
+
+CMD="java -cp ${CLASSPATH} org.junit.runner.JUnitCore ${TESTCLASS}"
+
+echo -e "\n\nRunning Azure Batch Tests...\n\n"
+echo $CMD
+$CMD
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/common/proto/evaluator_shim.proto
----------------------------------------------------------------------
diff --git a/lang/common/proto/evaluator_shim.proto b/lang/common/proto/evaluator_shim.proto
new file mode 100644
index 0000000..3c27b13
--- /dev/null
+++ b/lang/common/proto/evaluator_shim.proto
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+option java_package = "org.apache.reef.proto";
+option java_outer_classname = "EvaluatorShimProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+enum EvaluatorShimStatus {
+ UNKNOWN = 1;
+ ONLINE = 2;
+}
+
+enum EvaluatorShimCommand {
+ LAUNCH_EVALUATOR = 1;
+ TERMINATE = 2;
+}
+
+message EvaluatorShimControlProto {
+ required EvaluatorShimCommand command = 1;
+ optional string evaluatorConfigString = 2;
+ optional string evaluatorLaunchCommand = 3;
+ optional string evaluatorFileResourcesUrl = 4;
+}
+
+message EvaluatorShimStatusProto {
+ required EvaluatorShimStatus status = 1;
+ required string remoteIdentifier = 2;
+ required string containerId = 3;
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/common/proto/reef_protocol.proto
----------------------------------------------------------------------
diff --git a/lang/common/proto/reef_protocol.proto b/lang/common/proto/reef_protocol.proto
index 86675f9..1fb4489 100644
--- a/lang/common/proto/reef_protocol.proto
+++ b/lang/common/proto/reef_protocol.proto
@@ -23,6 +23,8 @@ import "evaluator_runtime.proto";
import "reef_service_protos.proto";
+import "evaluator_shim.proto";
+
option java_package = "org.apache.reef.proto";
@@ -43,4 +45,8 @@ message REEFMessage {
// Messages from evaluator_runtime.proto
optional EvaluatorControlProto evaluatorControl = 5;
optional EvaluatorHeartbeatProto evaluatorHeartBeat = 6;
+ optional EvaluatorShimControlProto evaluatorShimCommand = 7;
+
+ // Messages from evaluator_shim.proto
+ optional EvaluatorShimStatusProto evaluatorShimStatus = 8;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-common/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/pom.xml b/lang/java/reef-common/pom.xml
index 94e66c3..db1e87e 100644
--- a/lang/java/reef-common/pom.xml
+++ b/lang/java/reef-common/pom.xml
@@ -74,6 +74,7 @@ under the License.
<arg value="${protoPath}/reef_service_protos.proto"/>
<arg value="${protoPath}/evaluator_runtime.proto"/>
<arg value="${protoPath}/client_runtime.proto"/>
+ <arg value="${protoPath}/evaluator_shim.proto"/>
<arg value="${protoPath}/reef_protocol.proto"/>
</exec>
</target>
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
index 6b45f0d..9ca7c1f 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
@@ -18,6 +18,7 @@
*/
package org.apache.reef.runtime.common.files;
+import org.apache.commons.lang.StringUtils;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.annotations.audience.RuntimeAuthor;
@@ -32,6 +33,8 @@ import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.HashSet;
+import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -58,6 +61,15 @@ public final class JobJarMaker {
this.deleteTempFilesOnExit = deleteTempFilesOnExit;
}
+ /**
+ * Provider builder class for building JAR files.
+ *
+ * @return JarBuilder instance.
+ */
+ public JarBuilder newBuilder() {
+ return new JarBuilder();
+ }
+
public static void copy(final Iterable<FileResource> files, final File destinationFolder) {
if (!destinationFolder.exists() && !destinationFolder.mkdirs()) {
@@ -96,42 +108,87 @@ public final class JobJarMaker {
final JobSubmissionEvent jobSubmissionEvent,
final Configuration driverConfiguration) throws IOException {
- // Copy all files to a local job submission folder
- final File jobSubmissionFolder = makejobSubmissionFolder();
- LOG.log(Level.FINE, "Staging submission in {0}", jobSubmissionFolder);
+ return new JarBuilder()
+ .withConfiguration(driverConfiguration)
+ .addGlobalFileSet(jobSubmissionEvent.getGlobalFileSet())
+ .addLocalFileSet(jobSubmissionEvent.getLocalFileSet())
+ .withConfigurationFileName(this.fileNames.getDriverConfigurationName())
+ .build();
+ }
+
+ private File makeJobSubmissionFolder() throws IOException {
+ return Files.createTempDirectory(this.fileNames.getJobFolderPrefix()).toFile();
+ }
- final File localFolder = new File(jobSubmissionFolder, this.fileNames.getLocalFolderName());
- final File globalFolder = new File(jobSubmissionFolder, this.fileNames.getGlobalFolderName());
+ /**
+ * Builder class for building JAR files.
+ */
+ public final class JarBuilder {
- copy(jobSubmissionEvent.getGlobalFileSet(), globalFolder);
- copy(jobSubmissionEvent.getLocalFileSet(), localFolder);
+ private final Set<FileResource> localFiles = new HashSet<>();
+ private final Set<FileResource> globalFiles = new HashSet<>();
+ private Configuration configuration = null;
+ private String configurationFilename = null;
- // Store the Driver Configuration in the JAR file.
- this.configurationSerializer.toFile(
- driverConfiguration, new File(localFolder, this.fileNames.getDriverConfigurationName()));
+ private JarBuilder() {}
- // Create a JAR File for the submission
- final File jarFile = File.createTempFile(this.fileNames.getJobFolderPrefix(), this.fileNames.getJarFileSuffix());
+ public JarBuilder addLocalFileSet(final Set<FileResource> localFileResources) {
+ this.localFiles.addAll(localFileResources);
+ return this;
+ }
- LOG.log(Level.FINE, "Creating job submission jar file: {0}", jarFile);
- new JARFileMaker(jarFile).addChildren(jobSubmissionFolder).close();
+ public JarBuilder addGlobalFileSet(final Set<FileResource> globalFileResources) {
+ this.globalFiles.addAll(globalFileResources);
+ return this;
+ }
- if (this.deleteTempFilesOnExit) {
- LOG.log(Level.FINE,
- "Deleting the temporary job folder [{0}] and marking the jar file [{1}] for deletion after the JVM exits.",
- new Object[]{jobSubmissionFolder.getAbsolutePath(), jarFile.getAbsolutePath()});
- if (!jobSubmissionFolder.delete()) {
- LOG.log(Level.WARNING, "Failed to delete [{0}]", jobSubmissionFolder.getAbsolutePath());
- }
- jarFile.deleteOnExit();
- } else {
- LOG.log(Level.FINE, "Keeping the temporary job folder [{0}] and jar file [{1}] available after job submission.",
- new Object[]{jobSubmissionFolder.getAbsolutePath(), jarFile.getAbsolutePath()});
+ public JarBuilder withConfiguration(final Configuration withConfiguration) {
+ this.configuration = withConfiguration;
+ return this;
}
- return jarFile;
- }
- private File makejobSubmissionFolder() throws IOException {
- return Files.createTempDirectory(this.fileNames.getJobFolderPrefix()).toFile();
+ public JarBuilder withConfigurationFileName(final String withConfigurationFilename) {
+ this.configurationFilename = withConfigurationFilename;
+ return this;
+ }
+
+ public File build() throws IOException {
+ // Copy all files to a local job submission folder
+ final File jobSubmissionFolder = makeJobSubmissionFolder();
+ LOG.log(Level.FINE, "Staging submission in {0}", jobSubmissionFolder);
+
+ final File localFolder = new File(jobSubmissionFolder, JobJarMaker.this.fileNames.getLocalFolderName());
+ final File globalFolder = new File(jobSubmissionFolder, JobJarMaker.this.fileNames.getGlobalFolderName());
+
+ JobJarMaker.copy(this.globalFiles, globalFolder);
+ JobJarMaker.copy(this.localFiles, localFolder);
+
+ // Store the Configuration in the JAR file.
+ if (configuration != null && StringUtils.isNotBlank(this.configurationFilename)) {
+ JobJarMaker.this.configurationSerializer
+ .toFile(configuration, new File(localFolder, this.configurationFilename));
+ }
+
+ // Create a JAR File for the submission
+ final File jarFile = File.createTempFile(JobJarMaker.this.fileNames.getJobFolderPrefix(),
+ JobJarMaker.this.fileNames.getJarFileSuffix());
+
+ LOG.log(Level.FINE, "Creating job submission jar file: {0}", jarFile);
+ new JARFileMaker(jarFile).addChildren(jobSubmissionFolder).close();
+
+ if (JobJarMaker.this.deleteTempFilesOnExit) {
+ LOG.log(Level.FINE,
+ "Deleting the temporary job folder [{0}] and marking the jar file [{1}] for deletion after the JVM exits.",
+ new Object[]{jobSubmissionFolder.getAbsolutePath(), jarFile.getAbsolutePath()});
+ if (!jobSubmissionFolder.delete()) {
+ LOG.log(Level.WARNING, "Failed to delete [{0}]", jobSubmissionFolder.getAbsolutePath());
+ }
+ jarFile.deleteOnExit();
+ } else {
+ LOG.log(Level.FINE, "Keeping the temporary job folder [{0}] and jar file [{1}] available after job submission.",
+ new Object[]{jobSubmissionFolder.getAbsolutePath(), jarFile.getAbsolutePath()});
+ }
+ return jarFile;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
index 4c48e0a..ef2235d 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
@@ -27,6 +27,7 @@ import org.apache.reef.annotations.audience.EvaluatorSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.proto.ClientRuntimeProtocol;
import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.EvaluatorShimProtocol;
import org.apache.reef.proto.REEFProtocol;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.wake.remote.Codec;
@@ -62,6 +63,10 @@ public final class REEFMessageCodec implements Codec<GeneratedMessage> {
return message.getEvaluatorControl();
} else if (message.hasEvaluatorHeartBeat()) {
return message.getEvaluatorHeartBeat();
+ } else if (message.hasEvaluatorShimCommand()) {
+ return message.getEvaluatorShimCommand();
+ } else if (message.hasEvaluatorShimStatus()) {
+ return message.getEvaluatorShimStatus();
}
throw new RuntimeException("Unable to decode a message: " + message.toString());
} catch (final InvalidProtocolBufferException e) {
@@ -83,6 +88,10 @@ public final class REEFMessageCodec implements Codec<GeneratedMessage> {
message.setEvaluatorControl((EvaluatorRuntimeProtocol.EvaluatorControlProto) msg);
} else if (msg instanceof EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto) {
message.setEvaluatorHeartBeat((EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto) msg);
+ } else if (msg instanceof EvaluatorShimProtocol.EvaluatorShimControlProto) {
+ message.setEvaluatorShimCommand((EvaluatorShimProtocol.EvaluatorShimControlProto) msg);
+ } else if (msg instanceof EvaluatorShimProtocol.EvaluatorShimStatusProto) {
+ message.setEvaluatorShimStatus((EvaluatorShimProtocol.EvaluatorShimStatusProto) msg);
} else {
throw new RuntimeException("Unable to serialize: " + msg);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-examples/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/pom.xml b/lang/java/reef-examples/pom.xml
index fafb337..0668452 100644
--- a/lang/java/reef-examples/pom.xml
+++ b/lang/java/reef-examples/pom.xml
@@ -43,6 +43,11 @@ under the License.
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>reef-runtime-azbatch</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>reef-runtime-local</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloReefAzBatch.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloReefAzBatch.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloReefAzBatch.java
new file mode 100644
index 0000000..fc6c661
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloReefAzBatch.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.REEF;
+import org.apache.reef.runtime.azbatch.client.AzureBatchRuntimeConfiguration;
+import org.apache.reef.runtime.azbatch.client.AzureBatchRuntimeConfigurationProvider;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A main() for running hello REEF in Azure Batch.
+ */
+public final class HelloReefAzBatch {
+
+ private static final Logger LOG = Logger.getLogger(HelloReefAzBatch.class.getName());
+
+ /**
+ * Builds the runtime configuration for Azure Batch.
+ *
+ * @return the configuration of the runtime.
+ * @throws IOException
+ */
+ private static Configuration getEnvironmentConfiguration() throws IOException {
+ return AzureBatchRuntimeConfiguration.fromEnvironment();
+ }
+
+ /**
+ * Builds and returns driver configuration for HelloREEF driver.
+ *
+ * @return the configuration of the HelloREEF driver.
+ */
+ private static Configuration getDriverConfiguration() {
+ return DriverConfiguration.CONF
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloREEF")
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(HelloDriver.class))
+ .set(DriverConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class)
+ .build();
+ }
+
+ /**
+ * Start the Hello REEF job with the Azure Batch runtime.
+ *
+ * @param args command line parameters.
+ * @throws InjectionException configuration error.
+ * @throws IOException
+ */
+ public static void main(final String[] args) throws InjectionException, IOException {
+
+ final Configuration partialConfiguration = getEnvironmentConfiguration();
+ final Injector injector = Tang.Factory.getTang().newInjector(partialConfiguration);
+ final AzureBatchRuntimeConfigurationProvider runtimeConfigurationProvider =
+ injector.getInstance(AzureBatchRuntimeConfigurationProvider.class);
+ final Configuration driverConfiguration = getDriverConfiguration();
+
+ try (final REEF reef = Tang.Factory.getTang().newInjector(
+ runtimeConfigurationProvider.getAzureBatchRuntimeConfiguration()).getInstance(REEF.class)) {
+ reef.submit(driverConfiguration);
+ }
+ LOG.log(Level.INFO, "Job Submitted");
+ }
+
+ /**
+ * Private constructor.
+ */
+ private HelloReefAzBatch() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/README.md
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/README.md b/lang/java/reef-runtime-azbatch/README.md
new file mode 100644
index 0000000..db40852
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/README.md
@@ -0,0 +1,11 @@
+REEF Runtime for Azure Batch
+============================
+
+This folder containes support for running Apache [REEF] applications on Azure Batch. It
+uses the Azure Batch APIs defined [here][azbatchapi] via their [Java
+implementation][azbatchjava].
+
+
+[azbatchapi]: https://docs.microsoft.com/en-us/rest/api/batchservice/
+[azbatchjava]: https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.batch
+[REEF]: https://reef.apache.org
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/pom.xml b/lang/java/reef-runtime-azbatch/pom.xml
new file mode 100644
index 0000000..0280e8f
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.reef</groupId>
+ <artifactId>reef-project</artifactId>
+ <version>0.17.0-SNAPSHOT</version>
+ <relativePath>../../..</relativePath>
+ </parent>
+
+ <properties>
+ <rootPath>${basedir}/../../..</rootPath>
+ </properties>
+
+ <artifactId>reef-runtime-azbatch</artifactId>
+ <name>REEF Runtime for Azure Batch</name>
+ <description>An implementation of REEF to run on Azure Batch.</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.microsoft.windowsazure.storage</groupId>
+ <artifactId>microsoft-windowsazure-storage-sdk</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Microsoft Azure Batch APIs -->
+ <dependency>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-batch</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>2.7.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.7.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>2.7.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>20.0</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.10</version>
+ </dependency>
+ <!-- End of Microsoft Azure Batch APIs -->
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <targetPath>META-INF/</targetPath>
+ <filtering>false</filtering>
+ <directory>${basedir}/conf</directory>
+ <includes>
+ <include>*.xml</include>
+ <include>*.properties</include>
+ </includes>
+ <excludes>
+ </excludes>
+ </resource>
+ </resources>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <configLocation>lang/java/reef-common/src/main/resources/checkstyle-strict.xml</configLocation>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/AzureBatchClasspathProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/AzureBatchClasspathProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/AzureBatchClasspathProvider.java
new file mode 100644
index 0000000..bf37fe0
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/AzureBatchClasspathProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Access to the classpath according to the REEF file system standard.
+ */
+@Private
+public final class AzureBatchClasspathProvider implements RuntimeClasspathProvider {
+
+ private final List<String> classPathPrefix;
+ private final List<String> classPathSuffix;
+
+ @Inject
+ private AzureBatchClasspathProvider() {
+ this.classPathPrefix = new ArrayList<>();
+ this.classPathSuffix = new ArrayList<>();
+ }
+
+ @Override
+ public List<String> getDriverClasspathPrefix() {
+ return this.classPathPrefix;
+ }
+
+ @Override
+ public List<String> getDriverClasspathSuffix() {
+ return this.classPathSuffix;
+ }
+
+ @Override
+ public List<String> getEvaluatorClasspathPrefix() {
+ return this.classPathPrefix;
+ }
+
+ @Override
+ public List<String> getEvaluatorClasspathSuffix() {
+ return this.classPathSuffix;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/AzureBatchJVMPathProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/AzureBatchJVMPathProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/AzureBatchJVMPathProvider.java
new file mode 100644
index 0000000..f78f62c
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/AzureBatchJVMPathProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.files.RuntimePathProvider;
+
+import javax.inject.Inject;
+
+/**
+ * Supplies the java binary's path for Azure Batch.
+ */
+@Private
+public final class AzureBatchJVMPathProvider implements RuntimePathProvider {
+
+ @Inject
+ public AzureBatchJVMPathProvider() {
+ }
+
+ @Override
+ public String getPath() {
+ return "java";
+ }
+
+ @Override
+ public String toString() {
+ return getPath();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchDriverConfigurationProviderImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchDriverConfigurationProviderImpl.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchDriverConfigurationProviderImpl.java
new file mode 100644
index 0000000..0887e9d
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchDriverConfigurationProviderImpl.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.client;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.driver.AzureBatchDriverConfiguration;
+import org.apache.reef.runtime.azbatch.driver.RuntimeIdentifier;
+import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountName;
+import org.apache.reef.runtime.azbatch.parameters.AzureBatchPoolId;
+import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
+import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountUri;
+import org.apache.reef.runtime.azbatch.parameters.AzureStorageAccountName;
+import org.apache.reef.runtime.azbatch.parameters.AzureStorageContainerName;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.net.URI;
+
+/**
+ * Configuration provider for the Azure Batch runtime.
+ */
+@Private
+public final class AzureBatchDriverConfigurationProviderImpl implements DriverConfigurationProvider {
+
+ private final double jvmSlack;
+ private final String azureBatchAccountUri;
+ private final String azureBatchAccountName;
+ private final String azureBatchPoolId;
+ private final String azureStorageAccountName;
+ private final String azureStorageContainerName;
+ private final CommandBuilder commandBuilder;
+
+ @Inject
+ private AzureBatchDriverConfigurationProviderImpl(
+ @Parameter(JVMHeapSlack.class) final double jvmSlack,
+ @Parameter(AzureBatchAccountUri.class) final String azureBatchAccountUri,
+ @Parameter(AzureBatchAccountName.class) final String azureBatchAccountName,
+ @Parameter(AzureBatchPoolId.class) final String azureBatchPoolId,
+ @Parameter(AzureStorageAccountName.class) final String azureStorageAccountName,
+ @Parameter(AzureStorageContainerName.class) final String azureStorageContainerName,
+ final CommandBuilder commandBuilder) {
+ this.jvmSlack = jvmSlack;
+ this.azureBatchAccountUri = azureBatchAccountUri;
+ this.azureBatchAccountName = azureBatchAccountName;
+ this.azureBatchPoolId = azureBatchPoolId;
+ this.azureStorageAccountName = azureStorageAccountName;
+ this.azureStorageContainerName = azureStorageContainerName;
+ this.commandBuilder = commandBuilder;
+ }
+
+ /**
+ * Assembles the Driver configuration.
+ *
+ * @param jobFolder the job folder.
+ * @param clientRemoteId the client remote id.
+ * @param jobId the job id.
+ * @param applicationConfiguration the application configuration.
+ * @return the Driver configuration.
+ */
+ @Override
+ public Configuration getDriverConfiguration(final URI jobFolder,
+ final String clientRemoteId,
+ final String jobId,
+ final Configuration applicationConfiguration) {
+ return Configurations.merge(
+ AzureBatchDriverConfiguration.CONF.getBuilder()
+ .bindImplementation(CommandBuilder.class, this.commandBuilder.getClass()).build()
+ .set(AzureBatchDriverConfiguration.JOB_IDENTIFIER, jobId)
+ .set(AzureBatchDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId)
+ .set(AzureBatchDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
+ .set(AzureBatchDriverConfiguration.RUNTIME_NAME, RuntimeIdentifier.RUNTIME_NAME)
+ .set(AzureBatchDriverConfiguration.AZURE_BATCH_ACCOUNT_URI, this.azureBatchAccountUri)
+ .set(AzureBatchDriverConfiguration.AZURE_BATCH_ACCOUNT_NAME, this.azureBatchAccountName)
+ .set(AzureBatchDriverConfiguration.AZURE_BATCH_POOL_ID, this.azureBatchPoolId)
+ .set(AzureBatchDriverConfiguration.AZURE_STORAGE_ACCOUNT_NAME, this.azureStorageAccountName)
+ .set(AzureBatchDriverConfiguration.AZURE_STORAGE_CONTAINER_NAME, this.azureStorageContainerName)
+ .build(),
+ applicationConfiguration);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchJobSubmissionHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchJobSubmissionHandler.java
new file mode 100644
index 0000000..cbb8a8c
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchJobSubmissionHandler.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.client;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames;
+import org.apache.reef.runtime.azbatch.util.batch.AzureBatchHelper;
+import org.apache.reef.runtime.azbatch.util.storage.AzureStorageClient;
+import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.JobJarMaker;
+import org.apache.reef.tang.Configuration;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A {@link JobSubmissionHandler} implementation for Azure Batch runtime.
+ */
+@Private
+public final class AzureBatchJobSubmissionHandler implements JobSubmissionHandler {
+
+ private static final Logger LOG = Logger.getLogger(AzureBatchJobSubmissionHandler.class.getName());
+
+ /**
+ * Maximum number of characters allowed in Azure Batch job name. This limit is imposed by Azure Batch.
+ */
+ private static final int MAX_CHARS_JOB_NAME = 64;
+
+ private String applicationId;
+
+ private final AzureStorageClient azureStorageClient;
+ private final DriverConfigurationProvider driverConfigurationProvider;
+ private final JobJarMaker jobJarMaker;
+ private final CommandBuilder launchCommandBuilder;
+ private final AzureBatchFileNames azureBatchFileNames;
+ private final AzureBatchHelper azureBatchHelper;
+
+ @Inject
+ AzureBatchJobSubmissionHandler(
+ final AzureStorageClient azureStorageClient,
+ final DriverConfigurationProvider driverConfigurationProvider,
+ final JobJarMaker jobJarMaker,
+ final CommandBuilder launchCommandBuilder,
+ final AzureBatchFileNames azureBatchFileNames,
+ final AzureBatchHelper azureBatchHelper) {
+ this.azureStorageClient = azureStorageClient;
+ this.driverConfigurationProvider = driverConfigurationProvider;
+ this.jobJarMaker = jobJarMaker;
+ this.launchCommandBuilder = launchCommandBuilder;
+ this.azureBatchHelper = azureBatchHelper;
+ this.azureBatchFileNames = azureBatchFileNames;
+ }
+
+ /**
+ * Returns REEF application id (which corresponds to Azure Batch job id) or null if the application hasn't been
+ * submitted yet.
+ *
+ * @return REEF application id.
+ */
+ @Override
+ public String getApplicationId() {
+ return this.applicationId;
+ }
+
+ /**
+ * Closes the resources.
+ *
+ * @throws Exception
+ */
+ @Override
+ public void close() throws Exception {
+ LOG.log(Level.INFO, "Closing " + AzureBatchJobSubmissionHandler.class.getName());
+ }
+
+ /**
+ * Invoked when JobSubmissionEvent is triggered.
+ *
+ * @param jobSubmissionEvent triggered job submission event.
+ */
+ @Override
+ public void onNext(final JobSubmissionEvent jobSubmissionEvent) {
+ LOG.log(Level.FINEST, "Submitting job: {0}", jobSubmissionEvent);
+
+ try {
+ this.applicationId = createApplicationId(jobSubmissionEvent);
+ final String folderName = this.azureBatchFileNames.getStorageJobFolder(this.applicationId);
+
+ LOG.log(Level.FINE, "Creating a job folder on Azure at: {0}.", folderName);
+ final URI jobFolderURL = this.azureStorageClient.getJobSubmissionFolderUri(folderName);
+
+ LOG.log(Level.FINE, "Getting a shared access signature for {0}.", folderName);
+ final String storageContainerSAS = this.azureStorageClient.createContainerSharedAccessSignature();
+
+ LOG.log(Level.FINE, "Assembling Configuration for the Driver.");
+ final Configuration driverConfiguration = makeDriverConfiguration(jobSubmissionEvent, this.applicationId,
+ jobFolderURL);
+
+ LOG.log(Level.FINE, "Making Job JAR.");
+ final File jobSubmissionJarFile =
+ this.jobJarMaker.createJobSubmissionJAR(jobSubmissionEvent, driverConfiguration);
+
+ LOG.log(Level.FINE, "Uploading Job JAR to Azure.");
+ final URI jobJarSasUri = this.azureStorageClient.uploadFile(folderName, jobSubmissionJarFile);
+
+ LOG.log(Level.FINE, "Assembling application submission.");
+ final String command = this.launchCommandBuilder.buildDriverCommand(jobSubmissionEvent);
+
+ this.azureBatchHelper.submitJob(getApplicationId(), storageContainerSAS, jobJarSasUri, command);
+
+ } catch (final IOException e) {
+ LOG.log(Level.SEVERE, "Error submitting Azure Batch request: {0}", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Configuration makeDriverConfiguration(
+ final JobSubmissionEvent jobSubmissionEvent,
+ final String appId,
+ final URI jobFolderURL) {
+ return this.driverConfigurationProvider.getDriverConfiguration(
+ jobFolderURL, jobSubmissionEvent.getRemoteId(), appId, jobSubmissionEvent.getConfiguration());
+ }
+
+ private String createApplicationId(final JobSubmissionEvent jobSubmissionEvent) {
+ String uuid = UUID.randomUUID().toString();
+ String jobIdentifier = jobSubmissionEvent.getIdentifier();
+ String jobNameShort = jobIdentifier.length() + 1 + uuid.length() < MAX_CHARS_JOB_NAME ?
+ jobIdentifier : jobIdentifier.substring(0, MAX_CHARS_JOB_NAME - uuid.length() - 1);
+ return jobNameShort + "-" + uuid;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfiguration.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfiguration.java
new file mode 100644
index 0000000..541f8c4
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfiguration.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.client;
+
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.RequiredParameter;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Configuration Module for the Azure Batch runtime.
+ */
+@Public
+public final class AzureBatchRuntimeConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * The Azure Batch account URI.
+ */
+ public static final RequiredParameter<String> AZURE_BATCH_ACCOUNT_URI = new RequiredParameter<>();
+
+ /**
+ * The Azure Batch account name.
+ */
+ public static final RequiredParameter<String> AZURE_BATCH_ACCOUNT_NAME = new RequiredParameter<>();
+
+ /**
+ * The Azure Batch account key.
+ */
+ public static final RequiredParameter<String> AZURE_BATCH_ACCOUNT_KEY = new RequiredParameter<>();
+
+ /**
+ * The Azure Batch pool ID.
+ */
+ public static final RequiredParameter<String> AZURE_BATCH_POOL_ID = new RequiredParameter<>();
+
+ /**
+ * The environment variable that holds the path to the default configuration file.
+ */
+ public static final String AZBATCH_CONFIGURATION_FILE_ENVIRONMENT_VARIABLE = "REEF_AZBATCH_CONF";
+
+ /**
+ * The Azure Storage account name.
+ */
+ public static final RequiredParameter<String> AZURE_STORAGE_ACCOUNT_NAME = new RequiredParameter<>();
+
+ /**
+ * The Azure Storage account key.
+ */
+ public static final RequiredParameter<String> AZURE_STORAGE_ACCOUNT_KEY = new RequiredParameter<>();
+
+ /**
+ * The Azure Storage container name.
+ */
+ public static final RequiredParameter<String> AZURE_STORAGE_CONTAINER_NAME = new RequiredParameter<>();
+
+ /**
+ * Create a {@link Configuration} object from an Avro configuration file.
+ *
+ * @param file the configuration file.
+ * @return the configuration object.
+ * @throws IOException if the file can't be read
+ */
+ public static Configuration fromTextFile(final File file) throws IOException {
+ return new AvroConfigurationSerializer().fromTextFile(file);
+ }
+
+ /**
+ * Create a {@link Configuration} object from the
+ * {@link AZBATCH_CONFIGURATION_FILE_ENVIRONMENT_VARIABLE} environment variable.
+ *
+ * @return the configuration object.
+ * @throws IOException
+ * @see AZBATCH_CONFIGURATION_FILE_ENVIRONMENT_VARIABLE
+ */
+ public static Configuration fromEnvironment() throws IOException {
+
+ final String configurationPath =
+ System.getenv(AZBATCH_CONFIGURATION_FILE_ENVIRONMENT_VARIABLE);
+
+ if (null == configurationPath) {
+ throw new IOException("Environment Variable " +
+ AZBATCH_CONFIGURATION_FILE_ENVIRONMENT_VARIABLE +
+ " not set.");
+ }
+
+ final File configurationFile = new File(configurationPath);
+ if (!configurationFile.canRead()) {
+ throw new IOException("Environment Variable " +
+ AZBATCH_CONFIGURATION_FILE_ENVIRONMENT_VARIABLE +
+ " points to a file " + configurationFile.getAbsolutePath() +
+ " which can't be read."
+ );
+ }
+
+ return fromTextFile(configurationFile);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationCreator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationCreator.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationCreator.java
new file mode 100644
index 0000000..0d8860b
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationCreator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.client;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountKey;
+import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountName;
+import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountUri;
+import org.apache.reef.runtime.azbatch.parameters.AzureBatchPoolId;
+import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
+import org.apache.reef.runtime.azbatch.util.command.LinuxCommandBuilder;
+import org.apache.reef.runtime.azbatch.util.command.WindowsCommandBuilder;
+import org.apache.reef.runtime.azbatch.parameters.AzureStorageAccountKey;
+import org.apache.reef.runtime.azbatch.parameters.AzureStorageAccountName;
+import org.apache.reef.runtime.azbatch.parameters.AzureStorageContainerName;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+
+/**
+ * Class that builds the ConfigurationModule for Azure Batch runtime.
+ */
+@Private
+public final class AzureBatchRuntimeConfigurationCreator {
+
+ /**
+ * The ConfigurationModule for Azure Batch.
+ */
+ private static ConfigurationModule conf;
+
+ /**
+ * Get or create a {@link ConfigurationModule} for the Azure Batch runtime.
+ *
+ * @param isWindows true if Azure Batch pool nodes run Windows, false otherwise.
+ * @return the configuration module object.
+ */
+ public static ConfigurationModule getOrCreateAzureBatchRuntimeConfiguration(final boolean isWindows) {
+
+ if (AzureBatchRuntimeConfigurationCreator.conf == null) {
+ ConfigurationModuleBuilder builder = AzureBatchRuntimeConfigurationStatic.CONF;
+ ConfigurationModule module;
+ if (isWindows) {
+ module = builder.bindImplementation(CommandBuilder.class, WindowsCommandBuilder.class).build();
+ } else {
+ module = builder.bindImplementation(CommandBuilder.class, LinuxCommandBuilder.class).build();
+ }
+
+ AzureBatchRuntimeConfigurationCreator.conf = new AzureBatchRuntimeConfiguration()
+ .merge(module)
+ .bindNamedParameter(AzureBatchAccountName.class, AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_NAME)
+ .bindNamedParameter(AzureBatchAccountUri.class, AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_URI)
+ .bindNamedParameter(AzureBatchAccountKey.class, AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_KEY)
+ .bindNamedParameter(AzureBatchPoolId.class, AzureBatchRuntimeConfiguration.AZURE_BATCH_POOL_ID)
+ .bindNamedParameter(AzureStorageAccountName.class, AzureBatchRuntimeConfiguration.AZURE_STORAGE_ACCOUNT_NAME)
+ .bindNamedParameter(AzureStorageAccountKey.class, AzureBatchRuntimeConfiguration.AZURE_STORAGE_ACCOUNT_KEY)
+ .bindNamedParameter(
+ AzureStorageContainerName.class, AzureBatchRuntimeConfiguration.AZURE_STORAGE_CONTAINER_NAME)
+ .build();
+ }
+
+ return AzureBatchRuntimeConfigurationCreator.conf;
+ }
+
+ /*
+ * Private constructor since this is a utility class.
+ */
+ private AzureBatchRuntimeConfigurationCreator() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationProvider.java
new file mode 100644
index 0000000..c78e9e3
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.client;
+
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.runtime.azbatch.parameters.*;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * Class that provides the runtime configuration for Azure Batch.
+ */
+@Public
+public final class AzureBatchRuntimeConfigurationProvider {
+
+ private final String azureBatchAccountName;
+ private final String azureBatchAccountKey;
+ private final String azureBatchAccountUri;
+ private final String azureBatchPoolId;
+ private final String azureStorageAccountName;
+ private final String azureStorageAccountKey;
+ private final String azureStorageContainerName;
+ private final Boolean isWindows;
+
+ /**
+ * Private constructor.
+ */
+ @Inject
+ private AzureBatchRuntimeConfigurationProvider(
+ @Parameter(AzureBatchAccountName.class) final String azureBatchAccountName,
+ @Parameter(AzureBatchAccountKey.class) final String azureBatchAccountKey,
+ @Parameter(AzureBatchAccountUri.class) final String azureBatchAccountUri,
+ @Parameter(AzureBatchPoolId.class) final String azureBatchPoolId,
+ @Parameter(AzureStorageAccountName.class) final String azureStorageAccountName,
+ @Parameter(AzureStorageAccountKey.class) final String azureStorageAccountKey,
+ @Parameter(AzureStorageContainerName.class) final String azureStorageContainerName,
+ @Parameter(IsWindows.class) final Boolean isWindows) {
+ this.azureBatchAccountName = azureBatchAccountName;
+ this.azureBatchAccountKey = azureBatchAccountKey;
+ this.azureBatchAccountUri = azureBatchAccountUri;
+ this.azureBatchPoolId = azureBatchPoolId;
+ this.azureStorageAccountName = azureStorageAccountName;
+ this.azureStorageAccountKey = azureStorageAccountKey;
+ this.azureStorageContainerName = azureStorageContainerName;
+ this.isWindows = isWindows;
+ }
+
+ public Configuration getAzureBatchRuntimeConfiguration() {
+ return AzureBatchRuntimeConfigurationCreator
+ .getOrCreateAzureBatchRuntimeConfiguration(this.isWindows)
+ .set(AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_NAME, this.azureBatchAccountName)
+ .set(AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_KEY, this.azureBatchAccountKey)
+ .set(AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_URI, this.azureBatchAccountUri)
+ .set(AzureBatchRuntimeConfiguration.AZURE_BATCH_POOL_ID, this.azureBatchPoolId)
+ .set(AzureBatchRuntimeConfiguration.AZURE_STORAGE_ACCOUNT_NAME, this.azureStorageAccountName)
+ .set(AzureBatchRuntimeConfiguration.AZURE_STORAGE_ACCOUNT_KEY, this.azureStorageAccountKey)
+ .set(AzureBatchRuntimeConfiguration.AZURE_STORAGE_CONTAINER_NAME, this.azureStorageContainerName)
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationStatic.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationStatic.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationStatic.java
new file mode 100644
index 0000000..52d27b8
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationStatic.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.client;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.AzureBatchClasspathProvider;
+import org.apache.reef.runtime.azbatch.AzureBatchJVMPathProvider;
+import org.apache.reef.runtime.azbatch.util.batch.IAzureBatchCredentialProvider;
+import org.apache.reef.runtime.azbatch.util.batch.SharedKeyBatchCredentialProvider;
+import org.apache.reef.runtime.azbatch.util.storage.ICloudBlobClientProvider;
+import org.apache.reef.runtime.azbatch.util.storage.StorageKeyCloudBlobProvider;
+import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.files.RuntimePathProvider;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.util.logging.LoggingSetup;
+
+/**
+ * The static part of the AzureBatchRuntimeConfigurationStatic.
+ */
+@Private
+public class AzureBatchRuntimeConfigurationStatic extends ConfigurationModuleBuilder {
+ static {
+ LoggingSetup.setupCommonsLogging();
+ }
+
+ public static final ConfigurationModuleBuilder CONF = new AzureBatchRuntimeConfigurationStatic()
+ .merge(CommonRuntimeConfiguration.CONF)
+ .bindImplementation(JobSubmissionHandler.class, AzureBatchJobSubmissionHandler.class)
+ .bindImplementation(IAzureBatchCredentialProvider.class, SharedKeyBatchCredentialProvider.class)
+ .bindImplementation(ICloudBlobClientProvider.class, StorageKeyCloudBlobProvider.class)
+ .bindImplementation(DriverConfigurationProvider.class, AzureBatchDriverConfigurationProviderImpl.class)
+ .bindImplementation(RuntimeClasspathProvider.class, AzureBatchClasspathProvider.class)
+ .bindImplementation(RuntimePathProvider.class, AzureBatchJVMPathProvider.class);
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/package-info.java
new file mode 100644
index 0000000..b0e6df8
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Client for the REEF runtime for Azure Batch.
+ */
+package org.apache.reef.runtime.azbatch.client;
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchDriverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchDriverConfiguration.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchDriverConfiguration.java
new file mode 100644
index 0000000..9a096be
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchDriverConfiguration.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.runtime.azbatch.AzureBatchClasspathProvider;
+import org.apache.reef.runtime.azbatch.AzureBatchJVMPathProvider;
+import org.apache.reef.runtime.azbatch.parameters.*;
+import org.apache.reef.runtime.azbatch.util.batch.IAzureBatchCredentialProvider;
+import org.apache.reef.runtime.azbatch.util.batch.TokenBatchCredentialProvider;
+import org.apache.reef.runtime.azbatch.util.storage.ICloudBlobClientProvider;
+import org.apache.reef.runtime.azbatch.util.storage.SharedAccessSignatureCloudBlobClientProvider;
+import org.apache.reef.runtime.common.driver.api.*;
+import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
+import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes;
+import org.apache.reef.runtime.common.driver.parameters.EvaluatorTimeout;
+import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.files.RuntimePathProvider;
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.runtime.common.launch.parameters.LaunchID;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.RequiredParameter;
+
+/**
+ * ConfigurationModule to create Azure Batch Driver configurations.
+ */
+@Public
+@ClientSide
+public final class AzureBatchDriverConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * @see JobIdentifier
+ */
+ public static final RequiredParameter<String> JOB_IDENTIFIER = new RequiredParameter<>();
+
+ /**
+ * @see DefinedRuntimes
+ */
+ public static final RequiredParameter<String> RUNTIME_NAME = new RequiredParameter<>();
+
+ /**
+ * @see EvaluatorTimeout
+ */
+ public static final OptionalParameter<Long> EVALUATOR_TIMEOUT = new OptionalParameter<>();
+
+ /**
+ * The client remote identifier.
+ */
+ public static final OptionalParameter<String> CLIENT_REMOTE_IDENTIFIER = new OptionalParameter<>();
+
+ /**
+ * The Azure Batch account URI to be used by REEF.
+ */
+ public static final RequiredParameter<String> AZURE_BATCH_ACCOUNT_URI = new RequiredParameter<>();
+
+ /**
+ * The Azure Batch account name to be used by REEF.
+ */
+ public static final RequiredParameter<String> AZURE_BATCH_ACCOUNT_NAME = new RequiredParameter<>();
+
+ /**
+ * The Azure Batch Pool ID.
+ */
+ public static final RequiredParameter<String> AZURE_BATCH_POOL_ID = new RequiredParameter<>();
+
+ /**
+ * The name of the Azure Storage account.
+ */
+ public static final RequiredParameter<String> AZURE_STORAGE_ACCOUNT_NAME = new RequiredParameter<>();
+
+ /**
+ * The name of the Azure Storage account container.
+ */
+ public static final RequiredParameter<String> AZURE_STORAGE_CONTAINER_NAME = new RequiredParameter<>();
+
+ /**
+ * The fraction of the container memory NOT to use for the Java Heap.
+ */
+ public static final OptionalParameter<Double> JVM_HEAP_SLACK = new OptionalParameter<>();
+
+ public static final ConfigurationModule CONF = new AzureBatchDriverConfiguration()
+ .bindImplementation(IAzureBatchCredentialProvider.class, TokenBatchCredentialProvider.class)
+ .bindImplementation(ICloudBlobClientProvider.class, SharedAccessSignatureCloudBlobClientProvider.class)
+ .bindImplementation(ResourceLaunchHandler.class, AzureBatchResourceLaunchHandler.class)
+ .bindImplementation(ResourceReleaseHandler.class, AzureBatchResourceReleaseHandler.class)
+ .bindImplementation(ResourceRequestHandler.class, AzureBatchResourceRequestHandler.class)
+ .bindImplementation(ResourceManagerStartHandler.class, AzureBatchResourceManagerStartHandler.class)
+ .bindImplementation(ResourceManagerStopHandler.class, AzureBatchResourceManagerStopHandler.class)
+
+ // Bind Azure Batch Configuration Parameters
+ .bindNamedParameter(AzureBatchAccountUri.class, AZURE_BATCH_ACCOUNT_URI)
+ .bindNamedParameter(AzureBatchAccountName.class, AZURE_BATCH_ACCOUNT_NAME)
+ .bindNamedParameter(AzureBatchPoolId.class, AZURE_BATCH_POOL_ID)
+ .bindNamedParameter(AzureStorageAccountName.class, AZURE_STORAGE_ACCOUNT_NAME)
+ .bindNamedParameter(AzureStorageContainerName.class, AZURE_STORAGE_CONTAINER_NAME)
+
+ // Bind the fields bound in AbstractDriverRuntimeConfiguration
+ .bindNamedParameter(JobIdentifier.class, JOB_IDENTIFIER)
+ .bindNamedParameter(LaunchID.class, JOB_IDENTIFIER)
+ .bindNamedParameter(EvaluatorTimeout.class, EVALUATOR_TIMEOUT)
+ .bindNamedParameter(ClientRemoteIdentifier.class, CLIENT_REMOTE_IDENTIFIER)
+ .bindNamedParameter(ErrorHandlerRID.class, CLIENT_REMOTE_IDENTIFIER)
+ .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+ .bindImplementation(RuntimeClasspathProvider.class, AzureBatchClasspathProvider.class)
+ .bindImplementation(RuntimePathProvider.class, AzureBatchJVMPathProvider.class)
+ .bindSetEntry(DefinedRuntimes.class, RUNTIME_NAME)
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimConfigurationProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimConfigurationProvider.java
new file mode 100644
index 0000000..85b9103
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimConfigurationProvider.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.evaluator.EvaluatorShimConfiguration;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.Configuration;
+
+import javax.inject.Inject;
+
+/**
+ * Configuration provider for the Azure Batch evaluator shim.
+ */
+@Private
+public class AzureBatchEvaluatorShimConfigurationProvider {
+
+ private final RemoteManager remoteManager;
+
+ @Inject
+ AzureBatchEvaluatorShimConfigurationProvider(final RemoteManager remoteManager) {
+ this.remoteManager = remoteManager;
+ }
+
+ /**
+ * Constructs a {@link Configuration} object which will be serialized and written to shim.config and
+ * used to launch the evaluator shim.
+ *
+ * @param containerId id of the container for which the shim is being launched.
+ * @return A {@link Configuration} object needed to launch the evaluator shim.
+ */
+ public Configuration getConfiguration(final String containerId) {
+ return EvaluatorShimConfiguration.CONF
+ .set(EvaluatorShimConfiguration.DRIVER_REMOTE_IDENTIFIER, this.remoteManager.getMyIdentifier())
+ .set(EvaluatorShimConfiguration.CONTAINER_IDENTIFIER, containerId)
+ .build();
+ }
+}
[2/4] reef git commit: [REEF-1965] Implement REEF runtime for Azure
Batch
Posted by mo...@apache.org.
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchTaskStatusCheckPeriod.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchTaskStatusCheckPeriod.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchTaskStatusCheckPeriod.java
new file mode 100644
index 0000000..b4dc7e4
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchTaskStatusCheckPeriod.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The period to check for task status from Azure Batch in ms.
+ */
+@NamedParameter(doc = "The period to check for task status from Azure Batch in ms.", default_value = "5000")
+public final class AzureBatchTaskStatusCheckPeriod implements Name<Integer> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageAccountKey.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageAccountKey.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageAccountKey.java
new file mode 100644
index 0000000..f82cf50
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageAccountKey.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The Azure Storage account key.
+ */
+@NamedParameter(doc = "The Azure Storage account key.")
+public final class AzureStorageAccountKey implements Name<String> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageAccountName.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageAccountName.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageAccountName.java
new file mode 100644
index 0000000..8e0af3e
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageAccountName.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The Azure Storage account name.
+ */
+@NamedParameter(doc = "The Azure Storage account name.")
+public final class AzureStorageAccountName implements Name<String> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageBlobSASTokenValidityHours.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageBlobSASTokenValidityHours.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageBlobSASTokenValidityHours.java
new file mode 100644
index 0000000..4bd8644
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageBlobSASTokenValidityHours.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The Azure Blob Shared Access Signature token validity in hours.
+ */
+@NamedParameter(doc = "The Azure Blob Shared Access Signature token validity in hours.", default_values = "1")
+public final class AzureStorageBlobSASTokenValidityHours implements Name<Integer> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageContainerName.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageContainerName.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageContainerName.java
new file mode 100644
index 0000000..ef32d9e
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureStorageContainerName.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The Azure Storage container name.
+ */
+@NamedParameter(doc = "The Azure Storage container name.")
+public final class AzureStorageContainerName implements Name<String> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerIdentifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerIdentifier.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerIdentifier.java
new file mode 100644
index 0000000..00139db
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerIdentifier.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The container identifier for the evaluator shim.
+ */
+@NamedParameter(doc = "The container identifier.")
+public final class ContainerIdentifier implements Name<String> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/EvaluatorShimConfigFilePath.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/EvaluatorShimConfigFilePath.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/EvaluatorShimConfigFilePath.java
new file mode 100644
index 0000000..70ca902
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/EvaluatorShimConfigFilePath.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Evaluator shim config file path parameter.
+ */
+@NamedParameter(doc = "The path to the configuration file for the evaluator shim.")
+public final class EvaluatorShimConfigFilePath implements Name<String> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/IsWindows.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/IsWindows.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/IsWindows.java
new file mode 100644
index 0000000..8b26db5
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/IsWindows.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Whether or not the Azure Batch VMs are Windows based.
+ */
+@NamedParameter(doc = "Whether or not the Azure Batch VMs are Windows based.")
+public final class IsWindows implements Name<Boolean> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/package-info.java
new file mode 100644
index 0000000..e00140d
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Client for the REEF runtime for Azure Batch.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/AzureBatchFileNames.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/AzureBatchFileNames.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/AzureBatchFileNames.java
new file mode 100644
index 0000000..1eb4e05
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/AzureBatchFileNames.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+
+import javax.inject.Inject;
+
+/**
+ * Access to the various places things go according to the REEF Azure Batch runtime.
+ */
+@Private
+public final class AzureBatchFileNames {
+
+ private static final String STORAGE_JOB_FOLDER_PATH = "apps/reef/jobs/";
+ private static final String TASK_JAR_FILE_NAME = "local.jar";
+ private static final String EVALUATOR_RESOURCE_FILES_JAR_NAME = "resources.jar";
+
+ private static final String EVALUATOR_SHIM_CONFIGURATION_NAME = "shim.conf";
+ private static final String TEXTFILE_EXTENSION = ".txt";
+
+ private final REEFFileNames reefFileNames;
+
+ @Inject
+ private AzureBatchFileNames(final REEFFileNames reefFileNames) {
+ this.reefFileNames = reefFileNames;
+ }
+
+ /**
+ * @return The relative path to the folder storing the job assets.
+ */
+ public String getStorageJobFolder(final String jobId) {
+ return STORAGE_JOB_FOLDER_PATH + jobId;
+ }
+
+ /**
+ * @return The name used within the current working directory of the driver to redirect standard error to.
+ */
+ public String getEvaluatorStdErrFilename() {
+ return this.reefFileNames.getEvaluatorStderrFileName() + TEXTFILE_EXTENSION;
+ }
+
+ /**
+ * @return The name used within the current working directory of the driver to redirect standard out to.
+ */
+ public String getEvaluatorStdOutFilename() {
+ return this.reefFileNames.getEvaluatorStdoutFileName() + TEXTFILE_EXTENSION;
+ }
+
+ /**
+ * @return The path to the evaluator shim configuration:
+ * REEF_BASE_FOLDER/LOCAL_FOLDER/EVALUATOR_SHIM_CONFIGURATION_NAME.
+ */
+ public String getEvaluatorShimConfigurationPath() {
+ return this.reefFileNames.getLocalFolderPath() + "/" + EVALUATOR_SHIM_CONFIGURATION_NAME;
+ }
+
+ /**
+ * @return The name of the evaluator resource JAR file.
+ */
+ public String getEvaluatorResourceFilesJarName() {
+ return EVALUATOR_RESOURCE_FILES_JAR_NAME;
+ }
+
+ /**
+ * @return The name of the evaluator configuration file.
+ */
+ public String getEvaluatorShimConfigurationName() {
+ return EVALUATOR_SHIM_CONFIGURATION_NAME;
+ }
+
+ /**
+ * @return The name under which the task jar will be stored.
+ */
+ public static String getTaskJarFileName() {
+ return TASK_JAR_FILE_NAME;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/RemoteIdentifierParser.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/RemoteIdentifierParser.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/RemoteIdentifierParser.java
new file mode 100644
index 0000000..3275648
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/RemoteIdentifierParser.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util;
+
+/**
+ * Utility class to parse IP and port information from Azure Batch Node ID.
+ */
+public final class RemoteIdentifierParser {
+
+ private static final String PROTOCOL = "socket://";
+
+ private RemoteIdentifierParser() {}
+
+ /**
+ * Get the IP address from the remote identifier.
+ *
+ * @param remoteIdentifier the remote identifier.
+ * @return the IP address.
+ */
+ public static String parseIp(final String remoteIdentifier) {
+ return remoteIdentifier.substring(PROTOCOL.length(), remoteIdentifier.lastIndexOf(':'));
+ }
+
+ /**
+ * Get the port from the remote identifier.
+ *
+ * @param remoteIdentifier the remote identifier.
+ * @return the port.
+ */
+ public static int parsePort(final String remoteIdentifier) {
+ return Integer.parseInt(remoteIdentifier.substring(remoteIdentifier.lastIndexOf(':') + 1));
+ }
+
+ /**
+ * Get the node ID from the remote identifier.
+ *
+ * @param remoteIdentifier the remote identifier.
+ * @return the node ID.
+ */
+ public static String parseNodeId(final String remoteIdentifier) {
+ return remoteIdentifier.substring(PROTOCOL.length());
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/TaskStatusMapper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/TaskStatusMapper.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/TaskStatusMapper.java
new file mode 100644
index 0000000..69d44e9
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/TaskStatusMapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util;
+
+import com.microsoft.azure.batch.protocol.models.CloudTask;
+import com.microsoft.azure.batch.protocol.models.TaskExecutionResult;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
+
+/**
+ * Class that maps status of Azure Batch task to a REEF task.
+ */
+public final class TaskStatusMapper {
+
+ private TaskStatusMapper() {
+ }
+
+ /**
+ * Get the {@link State} from a {@link CloudTask}.
+ *
+ * @param task the task.
+ * @return the state of the task.
+ */
+ public static State getReefTaskState(final CloudTask task) {
+ switch (task.state()) {
+ case ACTIVE:
+ return State.INIT;
+ case RUNNING:
+ return State.RUNNING;
+ case COMPLETED:
+ if (task.executionInfo().result() == TaskExecutionResult.SUCCESS) {
+ return State.DONE;
+ } else {
+ return State.FAILED;
+ }
+ case PREPARING:
+ return State.RUNNING;
+ default:
+ throw new IllegalArgumentException("Azure batch cloud task has unknown state: " + task.state());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/AzureBatchHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/AzureBatchHelper.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/AzureBatchHelper.java
new file mode 100644
index 0000000..e0753df
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/AzureBatchHelper.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util.batch;
+
+import com.microsoft.azure.batch.BatchClient;
+import com.microsoft.azure.batch.protocol.models.*;
+
+import org.apache.reef.runtime.azbatch.client.AzureBatchJobSubmissionHandler;
+import org.apache.reef.runtime.azbatch.parameters.AzureBatchPoolId;
+import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames;
+import org.apache.reef.runtime.azbatch.util.storage.SharedAccessSignatureCloudBlobClientProvider;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A helper class for Azure Batch.
+ */
+public final class AzureBatchHelper {
+
+ private static final Logger LOG = Logger.getLogger(AzureBatchJobSubmissionHandler.class.getName());
+
+ /*
+ * Environment variable that contains the Azure Batch jobId.
+ */
+ private static final String AZ_BATCH_JOB_ID_ENV = "AZ_BATCH_JOB_ID";
+
+ private final AzureBatchFileNames azureBatchFileNames;
+
+ private final BatchClient client;
+ private final PoolInformation poolInfo;
+
+ @Inject
+ public AzureBatchHelper(
+ final AzureBatchFileNames azureBatchFileNames,
+ final IAzureBatchCredentialProvider credentialProvider,
+ @Parameter(AzureBatchPoolId.class) final String azureBatchPoolId) {
+ this.azureBatchFileNames = azureBatchFileNames;
+
+ this.client = BatchClient.open(credentialProvider.getCredentials());
+ this.poolInfo = new PoolInformation().withPoolId(azureBatchPoolId);
+ }
+
+ /**
+ * Create a job on Azure Batch.
+ *
+ * @param applicationId the ID of the application.
+ * @param storageContainerSAS the publicly accessible uri to the job container.
+ * @param jobJarUri the publicly accessible uri to the job jar directory.
+ * @param command the commandline argument to execute the job.
+ * @throws IOException
+ */
+ public void submitJob(final String applicationId, final String storageContainerSAS, final URI jobJarUri,
+ final String command) throws IOException {
+ ResourceFile jarResourceFile = new ResourceFile()
+ .withBlobSource(jobJarUri.toString())
+ .withFilePath(AzureBatchFileNames.getTaskJarFileName());
+
+ // This setting will signal Batch to generate an access token and pass it to the Job Manager Task (aka the Driver)
+ // as an environment variable.
+ // See https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.batch.cloudtask.authenticationtokensettings
+ // for more info.
+ AuthenticationTokenSettings authenticationTokenSettings = new AuthenticationTokenSettings();
+ authenticationTokenSettings.withAccess(Collections.singletonList(AccessScope.JOB));
+
+ EnvironmentSetting environmentSetting = new EnvironmentSetting()
+ .withName(SharedAccessSignatureCloudBlobClientProvider.AZURE_STORAGE_CONTAINER_SAS_TOKEN_ENV)
+ .withValue(storageContainerSAS);
+
+ JobManagerTask jobManagerTask = new JobManagerTask()
+ .withRunExclusive(false)
+ .withId(applicationId)
+ .withResourceFiles(Collections.singletonList(jarResourceFile))
+ .withEnvironmentSettings(Collections.singletonList(environmentSetting))
+ .withAuthenticationTokenSettings(authenticationTokenSettings)
+ .withCommandLine(command);
+
+ LOG.log(Level.INFO, "Job Manager (aka driver) task command: " + command);
+
+ JobAddParameter jobAddParameter = new JobAddParameter()
+ .withId(applicationId)
+ .withJobManagerTask(jobManagerTask)
+ .withPoolInfo(poolInfo);
+
+ client.jobOperations().createJob(jobAddParameter);
+ }
+
+ /**
+ * Adds a single task to a job on Azure Batch.
+ *
+ * @param jobId the ID of the job.
+ * @param taskId the ID of the task.
+ * @param jobJarUri the publicly accessible uri list to the job jar directory.
+ * @param confUri the publicly accessible uri list to the job configuration directory.
+ * @param command the commandline argument to execute the job.
+ * @throws IOException
+ */
+ public void submitTask(final String jobId, final String taskId, final URI jobJarUri,
+ final URI confUri, final String command)
+ throws IOException {
+
+ final List<ResourceFile> resources = new ArrayList<>();
+
+ final ResourceFile jarSourceFile = new ResourceFile()
+ .withBlobSource(jobJarUri.toString())
+ .withFilePath(AzureBatchFileNames.getTaskJarFileName());
+ resources.add(jarSourceFile);
+
+ final ResourceFile confSourceFile = new ResourceFile()
+ .withBlobSource(confUri.toString())
+ .withFilePath(this.azureBatchFileNames.getEvaluatorShimConfigurationPath());
+ resources.add(confSourceFile);
+
+ LOG.log(Level.INFO, "Evaluator task command: " + command);
+
+ final TaskAddParameter taskAddParameter = new TaskAddParameter()
+ .withId(taskId)
+ .withResourceFiles(resources)
+ .withCommandLine(command);
+
+ this.client.taskOperations().createTask(jobId, taskAddParameter);
+ }
+
+ /**
+ * List the tasks of the specified job.
+ *
+ * @param jobId the ID of the job.
+ * @return A list of CloudTask objects.
+ */
+ public List<CloudTask> getTaskStatusForJob(final String jobId) {
+ List<CloudTask> tasks = null;
+ try {
+ tasks = client.taskOperations().listTasks(jobId);
+ LOG.log(Level.INFO, "Task status for job: {0} returned {1} tasks", new Object[]{jobId, tasks.size()});
+ } catch (IOException | BatchErrorException ex) {
+ LOG.log(Level.SEVERE, "Exception when fetching Task status for job: {0}. Exception [{1}]:[2]",
+ new Object[]{jobId, ex.getMessage(), ex.getStackTrace()});
+ }
+
+ return tasks;
+ }
+
+ /**
+ * @return the job ID specified in the current system environment.
+ */
+ public String getAzureBatchJobId() {
+ return System.getenv(AZ_BATCH_JOB_ID_ENV);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/IAzureBatchCredentialProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/IAzureBatchCredentialProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/IAzureBatchCredentialProvider.java
new file mode 100644
index 0000000..681337d
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/IAzureBatchCredentialProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util.batch;
+
+import com.microsoft.azure.batch.auth.BatchCredentials;
+
+/**
+ * An interface for Azure Batch credential providers.
+ */
+public interface IAzureBatchCredentialProvider {
+
+ /**
+ * Returns {@link BatchCredentials} for Azure Batch account.
+ * @return an implementation of {@link BatchCredentials}.
+ */
+ BatchCredentials getCredentials();
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/SharedKeyBatchCredentialProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/SharedKeyBatchCredentialProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/SharedKeyBatchCredentialProvider.java
new file mode 100644
index 0000000..3e40319
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/SharedKeyBatchCredentialProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util.batch;
+
+import com.microsoft.azure.batch.auth.BatchCredentials;
+import com.microsoft.azure.batch.auth.BatchSharedKeyCredentials;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountKey;
+import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountName;
+import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountUri;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * An implementation of {@link IAzureBatchCredentialProvider} which returns {@link BatchSharedKeyCredentials}
+ * for Azure Batch account.
+ */
+@Private
+public final class SharedKeyBatchCredentialProvider implements IAzureBatchCredentialProvider {
+
+ private final String azureBatchAccountUri;
+ private final String azureBatchAccountName;
+ private final String azureBatchAccountKey;
+
+ @Inject
+ private SharedKeyBatchCredentialProvider(@Parameter(AzureBatchAccountUri.class) final String azureBatchAccountUri,
+ @Parameter(AzureBatchAccountName.class) final String azureBatchAccountName,
+ @Parameter(AzureBatchAccountKey.class) final String azureBatchAccountKey) {
+ this.azureBatchAccountUri = azureBatchAccountUri;
+ this.azureBatchAccountName = azureBatchAccountName;
+ this.azureBatchAccountKey = azureBatchAccountKey;
+ }
+
+ /**
+ * Returns credentials for Azure Batch account.
+ * @return an instance of {@link BatchSharedKeyCredentials} based on {@link AzureBatchAccountKey} parameter.
+ */
+ @Override
+ public BatchCredentials getCredentials() {
+ return new BatchSharedKeyCredentials(this.azureBatchAccountUri, this.azureBatchAccountName, azureBatchAccountKey);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/TokenBatchCredentialProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/TokenBatchCredentialProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/TokenBatchCredentialProvider.java
new file mode 100644
index 0000000..a3b8caa
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/TokenBatchCredentialProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util.batch;
+
+import com.microsoft.azure.batch.auth.BatchCredentials;
+import com.microsoft.rest.credentials.TokenCredentials;
+import okhttp3.OkHttpClient;
+import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountUri;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * An implementation of {@link IAzureBatchCredentialProvider} which returns credentials based on
+ * the authentication token provided in AZ_BATCH_AUTHENTICATION_TOKEN environment variable. This environment variable
+ * is populated by Azure Batch and allows the task running in Azure Batch perform certain operations on the job.
+ *
+ * See: https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.batch.cloudtask.authenticationtokensettings
+ * for more info.
+ */
+public class TokenBatchCredentialProvider implements IAzureBatchCredentialProvider {
+
+ private static final String AZ_BATCH_AUTH_TOKEN_ENV = "AZ_BATCH_AUTHENTICATION_TOKEN";
+
+ private final String azureBatchAccountUri;
+
+ @Inject
+ TokenBatchCredentialProvider(@Parameter(AzureBatchAccountUri.class) final String azureBatchAccountUri) {
+ this.azureBatchAccountUri = azureBatchAccountUri;
+ }
+
+ /**
+ * Returns credentials for Azure Batch account.
+ * @return an implementation of {@link BatchCredentials} which is based on the token provided by Azure Batch.
+ */
+ @Override
+ public BatchCredentials getCredentials() {
+
+ final TokenCredentials tokenCredentials = new TokenCredentials(null, System.getenv(AZ_BATCH_AUTH_TOKEN_ENV));
+
+ return new BatchCredentials() {
+ @Override
+ public String baseUrl() {
+ return azureBatchAccountUri;
+ }
+
+ @Override
+ public void applyCredentialsFilter(final OkHttpClient.Builder builder) {
+ tokenCredentials.applyCredentialsFilter(builder);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/package-info.java
new file mode 100644
index 0000000..830749e
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Azure Batch utilities for REEF Azure Batch runtime.
+ */
+package org.apache.reef.runtime.azbatch.util.batch;
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/AbstractCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/AbstractCommandBuilder.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/AbstractCommandBuilder.java
new file mode 100644
index 0000000..9a6cc15
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/AbstractCommandBuilder.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util.command;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.driver.evaluator.EvaluatorProcess;
+import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.files.RuntimePathProvider;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Abstract implementation of the OS command builder.
+ */
+public abstract class AbstractCommandBuilder implements CommandBuilder {
+
+ private final Class launcherClass;
+ private final Class shimLauncherClass;
+ private final List<String> commandListPrefix;
+ private final String osCommandFormat;
+ private final RuntimePathProvider runtimePathProvider;
+
+ protected final ClasspathProvider classpathProvider;
+ protected final REEFFileNames reefFileNames;
+ protected final AzureBatchFileNames azureBatchFileNames;
+
+ AbstractCommandBuilder(
+ final Class launcherClass,
+ final Class shimLauncherClass,
+ final List<String> commandListPrefix,
+ final String osCommandFormat,
+ final ClasspathProvider classpathProvider,
+ final RuntimePathProvider runtimePathProvider,
+ final REEFFileNames reefFileNames,
+ final AzureBatchFileNames azureBatchFileNames) {
+ this.launcherClass = launcherClass;
+ this.shimLauncherClass = shimLauncherClass;
+ this.commandListPrefix = commandListPrefix;
+ this.osCommandFormat = osCommandFormat;
+
+ this.classpathProvider = classpathProvider;
+ this.reefFileNames = reefFileNames;
+ this.azureBatchFileNames = azureBatchFileNames;
+ this.runtimePathProvider = runtimePathProvider;
+ }
+
+ public String buildDriverCommand(final JobSubmissionEvent jobSubmissionEvent) {
+ List<String> commandList = new JavaLaunchCommandBuilder(this.launcherClass, this.commandListPrefix)
+ .setJavaPath(runtimePathProvider.getPath())
+ .setConfigurationFilePaths(Collections.singletonList(this.reefFileNames.getDriverConfigurationPath()))
+ .setClassPath(getDriverClasspath())
+ .setMemory(jobSubmissionEvent.getDriverMemory().get())
+ .build();
+ return String.format(this.osCommandFormat, StringUtils.join(commandList, ' '));
+ }
+
+ public String buildEvaluatorShimCommand(final int evaluatorShimMemory, final String configurationPath) {
+ List<String> commandList = new JavaLaunchCommandBuilder(this.shimLauncherClass, this.commandListPrefix)
+ .setJavaPath(runtimePathProvider.getPath())
+ .setConfigurationFilePaths(Collections.singletonList(configurationPath))
+ .setClassPath(getEvaluatorShimClasspath())
+ .setMemory(evaluatorShimMemory)
+ .build();
+ return String.format(this.osCommandFormat, StringUtils.join(commandList, ' '));
+ }
+
+ public String buildEvaluatorCommand(final ResourceLaunchEvent resourceLaunchEvent,
+ final int containerMemory, final double jvmHeapFactor) {
+ List<String> commandList = new ArrayList<>();
+
+ // Use EvaluatorProcess to be compatible with JVMProcess and CLRProcess
+ final EvaluatorProcess process = resourceLaunchEvent.getProcess()
+ .setConfigurationFileName(this.reefFileNames.getEvaluatorConfigurationPath());
+
+ if (process.isOptionSet()) {
+ commandList.addAll(process.getCommandLine());
+ } else {
+ commandList.addAll(process.setMemory((int) (jvmHeapFactor * containerMemory)).getCommandLine());
+ }
+
+ return StringUtils.join(commandList, ' ');
+ }
+
+ /**
+ * Returns the driver classpath string which is compatible with the intricacies of the OS.
+ *
+ * @return classpath parameter string.
+ */
+ protected abstract String getDriverClasspath();
+
+ /**
+ * Returns the evaluator shim classpath string which is compatible with the intricacies of the OS.
+ *
+ * @return classpath parameter string.
+ */
+ protected abstract String getEvaluatorShimClasspath();
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/CommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/CommandBuilder.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/CommandBuilder.java
new file mode 100644
index 0000000..96cd716
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/CommandBuilder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util.command;
+
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
+
+/**
+ * Build the launch command for Java REEF processes for Azure Batch.
+ */
+public interface CommandBuilder {
+
+ /**
+ * Assembles the command to execute the Driver.
+ *
+ * @param jobSubmissionEvent the submission event for which to build the command.
+ * @return the command string.
+ */
+ String buildDriverCommand(JobSubmissionEvent jobSubmissionEvent);
+
+ /**
+ * Assembles the command to execute the Evaluator Shim.
+ *
+ * @param evaluatorShimMemory the maximum size in, in bytes, of the memory allocation pool.
+ * @param configurationPath the relative path to the configuration file.
+ * @return the command string.
+ */
+ String buildEvaluatorShimCommand(int evaluatorShimMemory, String configurationPath);
+
+ /**
+ * Assembles the command to execute the Evaluator.
+ *
+ * @param resourceLaunchEvent the launch event for which to build the command.
+ * @param containerMemory the maximum size in, in bytes, of the memory allocation pool.
+ * @param jvmHeapFactor a multiplicative factor to increase the container memory.
+ * @return the command string.
+ */
+ String buildEvaluatorCommand(final ResourceLaunchEvent resourceLaunchEvent,
+ final int containerMemory, final double jvmHeapFactor);
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/LinuxCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/LinuxCommandBuilder.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/LinuxCommandBuilder.java
new file mode 100644
index 0000000..adb58f5
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/LinuxCommandBuilder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util.command;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.evaluator.EvaluatorShimLauncher;
+import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames;
+import org.apache.reef.runtime.common.REEFLauncher;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.files.RuntimePathProvider;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Build the launch command for Java REEF processes for Azure Batch Linux pools.
+ */
+@Private
+public class LinuxCommandBuilder extends AbstractCommandBuilder {
+
+ private static final Class LAUNCHER_CLASS = REEFLauncher.class;
+ private static final Class SHIM_LAUNCHER_CLASS = EvaluatorShimLauncher.class;
+ private static final List<String> COMMAND_LIST_PREFIX =
+ Collections.unmodifiableList(Arrays.asList(
+ "unzip " + AzureBatchFileNames.getTaskJarFileName() + " -d 'reef/'" + ";"));
+ private static final char CLASSPATH_SEPARATOR_CHAR = ':';
+ private static final String OS_COMMAND_FORMAT = "/bin/sh -c \"%s\"";
+
+ @Inject
+ LinuxCommandBuilder(
+ final ClasspathProvider classpathProvider,
+ final RuntimePathProvider runtimePathProvider,
+ final REEFFileNames reefFileNames,
+ final AzureBatchFileNames azureBatchFileNames) {
+ super(LAUNCHER_CLASS, SHIM_LAUNCHER_CLASS, COMMAND_LIST_PREFIX, OS_COMMAND_FORMAT,
+ classpathProvider, runtimePathProvider, reefFileNames, azureBatchFileNames);
+ }
+
+ @Override
+ protected String getDriverClasspath() {
+ return StringUtils.join(super.classpathProvider.getDriverClasspath(), CLASSPATH_SEPARATOR_CHAR);
+ }
+
+ @Override
+ protected String getEvaluatorShimClasspath() {
+ return StringUtils.join(super.classpathProvider.getEvaluatorClasspath(), CLASSPATH_SEPARATOR_CHAR);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/WindowsCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/WindowsCommandBuilder.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/WindowsCommandBuilder.java
new file mode 100644
index 0000000..f382bef
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/WindowsCommandBuilder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util.command;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.evaluator.EvaluatorShimLauncher;
+import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames;
+import org.apache.reef.runtime.common.REEFLauncher;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.files.RuntimePathProvider;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Build the launch command for Java REEF processes for Azure Batch Windows pools.
+ */
+@Private
+public class WindowsCommandBuilder extends AbstractCommandBuilder {
+
+ private static final Class LAUNCHER_CLASS = REEFLauncher.class;
+ private static final Class SHIM_LAUNCHER_CLASS = EvaluatorShimLauncher.class;
+ private static final List<String> COMMAND_LIST_PREFIX = Collections.unmodifiableList(
+ Arrays.asList(
+ "Add-Type -AssemblyName System.IO.Compression.FileSystem; ",
+ "[System.IO.Compression.ZipFile]::ExtractToDirectory(\\\"$env:AZ_BATCH_TASK_WORKING_DIR\\" +
+ AzureBatchFileNames.getTaskJarFileName() + "\\\", " +
+ "\\\"$env:AZ_BATCH_TASK_WORKING_DIR\\reef\\\"); ")
+ );
+ private static final char CLASSPATH_SEPARATOR_CHAR = ';';
+ private static final String OS_COMMAND_FORMAT = "powershell.exe /c \"%s\";";
+
+ @Inject
+ WindowsCommandBuilder(
+ final ClasspathProvider classpathProvider,
+ final RuntimePathProvider runtimePathProvider,
+ final REEFFileNames reefFileNames,
+ final AzureBatchFileNames azureBatchFileNames) {
+ super(LAUNCHER_CLASS, SHIM_LAUNCHER_CLASS, COMMAND_LIST_PREFIX, OS_COMMAND_FORMAT,
+ classpathProvider, runtimePathProvider, reefFileNames, azureBatchFileNames);
+ }
+
+ @Override
+ protected String getDriverClasspath() {
+ return String.format("'%s'", StringUtils.join(
+ super.classpathProvider.getDriverClasspath(), CLASSPATH_SEPARATOR_CHAR));
+ }
+
+ @Override
+ protected String getEvaluatorShimClasspath() {
+ return String.format("'%s'", StringUtils.join(
+ super.classpathProvider.getEvaluatorClasspath(), CLASSPATH_SEPARATOR_CHAR));
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/package-info.java
new file mode 100644
index 0000000..edd04b9
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Command line utilities for Azure Batch runtime.
+ */
+package org.apache.reef.runtime.azbatch.util.command;
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/package-info.java
new file mode 100644
index 0000000..622e972
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * A REEF runtime for Azure Batch.
+ */
+package org.apache.reef.runtime.azbatch.util;
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/AzureStorageClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/AzureStorageClient.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/AzureStorageClient.java
new file mode 100644
index 0000000..83b522b
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/AzureStorageClient.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util.storage;
+
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.*;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.parameters.AzureStorageBlobSASTokenValidityHours;
+import org.apache.reef.runtime.azbatch.parameters.AzureStorageContainerName;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Azure storage utility to upload Driver and Evaluator jars to blob storage
+ * and generate SAS URIs.
+ */
+@Private
+public final class AzureStorageClient {
+ private static final Logger LOG = Logger.getLogger(AzureStorageClient.class.getName());
+
+ private final ICloudBlobClientProvider cloudBlobClientProvider;
+
+ private final String azureStorageContainerName;
+ private final int blobSASTokenValidityHours;
+
+ @Inject
+ AzureStorageClient(
+ final ICloudBlobClientProvider cloudBlobClientProvider,
+ @Parameter(AzureStorageContainerName.class) final String azureStorageContainerName,
+ @Parameter(AzureStorageBlobSASTokenValidityHours.class) final int blobSASTokenValidityHours) {
+ this.cloudBlobClientProvider = cloudBlobClientProvider;
+ this.azureStorageContainerName = azureStorageContainerName;
+ this.blobSASTokenValidityHours = blobSASTokenValidityHours;
+ }
+
+ public URI getJobSubmissionFolderUri(final String jobFolder) throws IOException {
+ final CloudBlobClient cloudBlobClient = this.cloudBlobClientProvider.getCloudBlobClient();
+ try {
+ final CloudBlobContainer container = cloudBlobClient.getContainerReference(this.azureStorageContainerName);
+ return container.getDirectoryReference(jobFolder).getUri();
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException("Failed to get the job submission folder URI", e);
+ }
+ }
+
+
+ public String createContainerSharedAccessSignature() throws IOException {
+ try {
+ CloudBlobClient cloudBlobClient = this.cloudBlobClientProvider.getCloudBlobClient();
+ CloudBlobContainer cloudBlobContainer = cloudBlobClient.getContainerReference(this.azureStorageContainerName);
+ cloudBlobContainer.createIfNotExists();
+
+ return cloudBlobContainer.generateSharedAccessSignature(getSharedAccessContainerPolicy(), null);
+
+ } catch (StorageException | URISyntaxException | InvalidKeyException e) {
+ throw new IOException("Failed to generate a shared access signature for storage container.", e);
+ }
+ }
+
+
+ /**
+ * Upload a file to the storage account.
+ *
+ * @param jobFolder the path to the destination folder within storage container.
+ * @param file the source file.
+ * @return the SAS URI to the uploaded file.
+ * @throws IOException
+ */
+ public URI uploadFile(final String jobFolder, final File file) throws IOException {
+
+ LOG.log(Level.INFO, "Uploading [{0}] to [{1}]", new Object[]{file, jobFolder});
+
+ try {
+ final CloudBlobClient cloudBlobClient = this.cloudBlobClientProvider.getCloudBlobClient();
+ final CloudBlobContainer container = cloudBlobClient.getContainerReference(this.azureStorageContainerName);
+
+ final String destination = String.format("%s/%s", jobFolder, file.getName());
+ final CloudBlockBlob blob = container.getBlockBlobReference(destination);
+
+ try (FileInputStream fis = new FileInputStream(file)) {
+ blob.upload(fis, file.length());
+ }
+
+ LOG.log(Level.FINE, "Uploaded to: {0}", blob.getStorageUri().getPrimaryUri());
+ return this.cloudBlobClientProvider.generateSharedAccessSignature(blob, getSharedAccessBlobPolicy());
+
+ } catch (final URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private SharedAccessBlobPolicy getSharedAccessBlobPolicy() {
+ return getSharedAccessBlobPolicy(EnumSet.of(SharedAccessBlobPermissions.READ));
+ }
+
+ private SharedAccessBlobPolicy getSharedAccessContainerPolicy() {
+ return getSharedAccessBlobPolicy(EnumSet.of(SharedAccessBlobPermissions.READ, SharedAccessBlobPermissions.WRITE));
+ }
+
+ private SharedAccessBlobPolicy getSharedAccessBlobPolicy(
+ final EnumSet<SharedAccessBlobPermissions> permissions) {
+
+ Calendar calendar = Calendar.getInstance();
+ calendar.add(Calendar.HOUR, this.blobSASTokenValidityHours);
+ Date tokenExpirationDate = calendar.getTime();
+
+ final SharedAccessBlobPolicy policy = new SharedAccessBlobPolicy();
+ policy.setPermissions(permissions);
+ policy.setSharedAccessStartTime(Calendar.getInstance().getTime());
+ policy.setSharedAccessExpiryTime(tokenExpirationDate);
+
+ return policy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/ICloudBlobClientProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/ICloudBlobClientProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/ICloudBlobClientProvider.java
new file mode 100644
index 0000000..d51dbc2
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/ICloudBlobClientProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util.storage;
+
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
+import com.microsoft.windowsazure.storage.blob.CloudBlobClient;
+import com.microsoft.windowsazure.storage.blob.SharedAccessBlobPolicy;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * An interface for classes that provide an instance of {@link CloudBlobClient} based
+ * on available authentication mechanism.
+ */
+public interface ICloudBlobClientProvider {
+
+ /**
+ * Returns an instance of {@link CloudBlobClient} based on available authentication mechanism.
+ * @return an instance of {@link CloudBlobClient}.
+ * @throws IOException
+ */
+ CloudBlobClient getCloudBlobClient() throws IOException;
+
+ /**
+ * Generates a Shared Access Key URI for the given {@link CloudBlob}.
+ * @param cloudBlob cloud blob to create a Shared Access Key URI for.
+ * @param policy an instance of {@link SharedAccessBlobPolicy} that specifies permissions and signature's
+ * validity time period.
+ * @return a Shared Access Key URI for the given {@link CloudBlob}.
+ * @throws IOException
+ */
+ URI generateSharedAccessSignature(final CloudBlob cloudBlob, final SharedAccessBlobPolicy policy)
+ throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/SharedAccessSignatureCloudBlobClientProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/SharedAccessSignatureCloudBlobClientProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/SharedAccessSignatureCloudBlobClientProvider.java
new file mode 100644
index 0000000..e678445
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/SharedAccessSignatureCloudBlobClientProvider.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util.storage;
+
+import com.microsoft.windowsazure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
+import com.microsoft.windowsazure.storage.blob.CloudBlobClient;
+import com.microsoft.windowsazure.storage.blob.SharedAccessBlobPolicy;
+import com.microsoft.windowsazure.storage.core.PathUtility;
+import com.microsoft.windowsazure.storage.core.UriQueryBuilder;
+import org.apache.reef.runtime.azbatch.parameters.AzureStorageAccountName;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Cloud Blob client provider that uses Azure Storage Shared Access Signature authorization.
+ */
+public final class SharedAccessSignatureCloudBlobClientProvider implements ICloudBlobClientProvider {
+
+ private static final Logger LOG = Logger.getLogger(AzureStorageClient.class.getName());
+
+ public static final String AZURE_STORAGE_CONTAINER_SAS_TOKEN_ENV = "AZURE_STORAGE_CONTAINER_SAS_TOKEN_ENV";
+
+ private static final String AZURE_STORAGE_ACCOUNT_URI_FORMAT = "https://%s.blob.core.windows.net";
+
+ private final String azureStorageAccountName;
+ private final String azureStorageContainerSASToken;
+
+ @Inject
+ SharedAccessSignatureCloudBlobClientProvider(
+ @Parameter(AzureStorageAccountName.class) final String azureStorageAccountName) {
+ this.azureStorageAccountName = azureStorageAccountName;
+ this.azureStorageContainerSASToken = System.getenv(AZURE_STORAGE_CONTAINER_SAS_TOKEN_ENV);
+ }
+
+ /**
+ * Returns an instance of {@link CloudBlobClient} based on available authentication mechanism.
+ * @return an instance of {@link CloudBlobClient}.
+ * @throws IOException
+ */
+ @Override
+ public CloudBlobClient getCloudBlobClient() throws IOException {
+ StorageCredentialsSharedAccessSignature signature =
+ new StorageCredentialsSharedAccessSignature(this.azureStorageContainerSASToken);
+ URI storageAccountUri;
+ try {
+ storageAccountUri = new URI(String.format(AZURE_STORAGE_ACCOUNT_URI_FORMAT, this.azureStorageAccountName));
+ } catch (URISyntaxException e) {
+ throw new IOException("Failed to generate Storage Account URI", e);
+ }
+
+ return new CloudBlobClient(storageAccountUri, signature);
+ }
+
+ /**
+ * Generates a Shared Access Key URI for the given {@link CloudBlob}.
+ * @param cloudBlob cloud blob to create a Shared Access Key URI for.
+ * @param policy an instance of {@link SharedAccessBlobPolicy} that specifies permissions and signature's
+ * validity time period.
+ * @return a Shared Access Key URI for the given {@link CloudBlob}.
+ * @throws IOException
+ */
+ @Override
+ public URI generateSharedAccessSignature(final CloudBlob cloudBlob, final SharedAccessBlobPolicy policy)
+ throws IOException {
+ try {
+ final URI uri = cloudBlob.getStorageUri().getPrimaryUri();
+
+ Map<String, String[]> queryString = PathUtility.parseQueryString(this.azureStorageContainerSASToken);
+
+ UriQueryBuilder builder = new UriQueryBuilder();
+ for (Map.Entry<String, String[]> entry : queryString.entrySet()) {
+ for (String value : entry.getValue()) {
+ builder.add(entry.getKey(), value);
+ }
+ }
+
+ URI result = builder.addToURI(uri);
+ LOG.log(Level.INFO, "Here's the URI: " + result);
+
+ return result;
+ } catch (StorageException | URISyntaxException e) {
+ throw new IOException("Failed to generated a Shared Access Signature.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/StorageKeyCloudBlobProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/StorageKeyCloudBlobProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/StorageKeyCloudBlobProvider.java
new file mode 100644
index 0000000..ae8613d
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/StorageKeyCloudBlobProvider.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util.storage;
+
+import com.microsoft.windowsazure.storage.CloudStorageAccount;
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
+import com.microsoft.windowsazure.storage.blob.CloudBlobClient;
+import com.microsoft.windowsazure.storage.blob.SharedAccessBlobPolicy;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.parameters.AzureStorageAccountKey;
+import org.apache.reef.runtime.azbatch.parameters.AzureStorageAccountName;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+/**
+ * Cloud Blob client provider that uses Azure Storage Shared Key authorization.
+ */
+@Private
+public final class StorageKeyCloudBlobProvider implements ICloudBlobClientProvider {
+
+ private static final String AZURE_STORAGE_CONNECTION_STRING_FORMAT =
+ "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
+
+ private final String azureStorageAccountName;
+ private final String azureStorageAccountKey;
+
+ @Inject
+ StorageKeyCloudBlobProvider(
+ @Parameter(AzureStorageAccountName.class) final String azureStorageAccountName,
+ @Parameter(AzureStorageAccountKey.class) final String azureStorageAccountKey) {
+ this.azureStorageAccountName = azureStorageAccountName;
+ this.azureStorageAccountKey = azureStorageAccountKey;
+ }
+
+ /**
+ * Returns an instance of {@link CloudBlobClient} based on available authentication mechanism.
+ * @return an instance of {@link CloudBlobClient}.
+ * @throws IOException
+ */
+ @Override
+ public CloudBlobClient getCloudBlobClient() throws IOException {
+ String connectionString = String.format(AZURE_STORAGE_CONNECTION_STRING_FORMAT,
+ this.azureStorageAccountName, this.azureStorageAccountKey);
+ try {
+ return CloudStorageAccount.parse(connectionString).createCloudBlobClient();
+ } catch (URISyntaxException | InvalidKeyException e) {
+ throw new IOException("Failed to create a Cloud Storage Account.", e);
+ }
+ }
+
+ /**
+ * Generates a Shared Access Key URI for the given {@link CloudBlob}.
+ * @param cloudBlob cloud blob to create a Shared Access Key URI for.
+ * @param policy an instance of {@link SharedAccessBlobPolicy} that specifies permissions and signature's
+ * validity time period.
+ * @return a Shared Access Key URI for the given {@link CloudBlob}.
+ * @throws IOException
+ */
+ @Override
+ public URI generateSharedAccessSignature(final CloudBlob cloudBlob, final SharedAccessBlobPolicy policy)
+ throws IOException {
+ try {
+ final String sas = cloudBlob.generateSharedAccessSignature(policy, null);
+ final String uri = cloudBlob.getStorageUri().getPrimaryUri().toString();
+ return new URI(uri + "?" + sas);
+ } catch (StorageException | InvalidKeyException | URISyntaxException e) {
+ throw new IOException("Failed to generated a Shared Access Signature.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/package-info.java
new file mode 100644
index 0000000..adbe05f
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Azure Storage utilities for Azure Batch runtime.
+ */
+package org.apache.reef.runtime.azbatch.util.storage;
[3/4] reef git commit: [REEF-1965] Implement REEF runtime for Azure
Batch
Posted by mo...@apache.org.
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimManager.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimManager.java
new file mode 100644
index 0000000..96f247a
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimManager.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import com.microsoft.azure.batch.protocol.models.CloudTask;
+import com.microsoft.azure.batch.protocol.models.TaskState;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.proto.EvaluatorShimProtocol;
+import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames;
+import org.apache.reef.runtime.azbatch.util.batch.AzureBatchHelper;
+import org.apache.reef.runtime.azbatch.util.storage.AzureStorageClient;
+import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
+import org.apache.reef.runtime.azbatch.util.RemoteIdentifierParser;
+import org.apache.reef.runtime.azbatch.util.TaskStatusMapper;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
+import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
+import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
+import org.apache.reef.runtime.common.files.*;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+import org.apache.reef.wake.remote.impl.SocketRemoteIdentifier;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.apache.reef.runtime.azbatch.driver.RuntimeIdentifier.RUNTIME_NAME;
+
+/**
+ * The Driver's view of evaluator shims running in the cluster. This class serves the following purposes:
+ * 1. listens for evaluator shim status messages signaling that the shim is online and ready to start the evaluator
+ * process.
+ * 2. listens for {@link ResourceLaunchEvent} events and sends commands to the evaluator shims to start the
+ * evaluator process.
+ * 3. listens for {@link ResourceReleaseEvent} events and sends terminate commands to the evaluator shims.
+ * 4. triggers {@link org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent}
+ * events to update REEF Common on runtime status.
+ * 5. triggers {@link org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent}
+ * events to update REEF Common on container statuses.
+ */
+@Private
+@DriverSide
+public final class AzureBatchEvaluatorShimManager
+ implements EventHandler<RemoteMessage<EvaluatorShimProtocol.EvaluatorShimStatusProto>> {
+
+ private static final Logger LOG = Logger.getLogger(AzureBatchEvaluatorShimManager.class.getName());
+
+ private static final int EVALUATOR_SHIM_MEMORY_MB = 64;
+
+ private final Map<String, ResourceRequestEvent> outstandingResourceRequests;
+ private final AtomicInteger outstandingResourceRequestCount;
+
+ private final Map<String, CloudTask> failedResources;
+
+ private final AutoCloseable evaluatorShimCommandChannel;
+
+ private final AzureStorageClient azureStorageClient;
+ private final REEFFileNames reefFileNames;
+ private final AzureBatchFileNames azureBatchFileNames;
+ private final RemoteManager remoteManager;
+ private final AzureBatchHelper azureBatchHelper;
+ private final AzureBatchEvaluatorShimConfigurationProvider evaluatorShimConfigurationProvider;
+ private final JobJarMaker jobJarMaker;
+ private final CommandBuilder launchCommandBuilder;
+ private final REEFEventHandlers reefEventHandlers;
+ private final ConfigurationSerializer configurationSerializer;
+
+ private final Evaluators evaluators;
+
+ @Inject
+ AzureBatchEvaluatorShimManager(
+ final AzureStorageClient azureStorageClient,
+ final REEFFileNames reefFileNames,
+ final AzureBatchFileNames azureBatchFileNames,
+ final RemoteManager remoteManager,
+ final REEFEventHandlers reefEventHandlers,
+ final Evaluators evaluators,
+ final CommandBuilder launchCommandBuilder,
+ final AzureBatchHelper azureBatchHelper,
+ final JobJarMaker jobJarMaker,
+ final AzureBatchEvaluatorShimConfigurationProvider evaluatorShimConfigurationProvider,
+ final ConfigurationSerializer configurationSerializer) {
+ this.azureStorageClient = azureStorageClient;
+ this.reefFileNames = reefFileNames;
+ this.azureBatchFileNames = azureBatchFileNames;
+ this.remoteManager = remoteManager;
+
+ this.reefEventHandlers = reefEventHandlers;
+ this.evaluators = evaluators;
+
+ this.launchCommandBuilder = launchCommandBuilder;
+
+ this.azureBatchHelper = azureBatchHelper;
+ this.jobJarMaker = jobJarMaker;
+
+ this.evaluatorShimConfigurationProvider = evaluatorShimConfigurationProvider;
+
+ this.outstandingResourceRequests = new ConcurrentHashMap<>();
+ this.outstandingResourceRequestCount = new AtomicInteger();
+
+ this.failedResources = new ConcurrentHashMap<>();
+
+ this.evaluatorShimCommandChannel = remoteManager
+ .registerHandler(EvaluatorShimProtocol.EvaluatorShimStatusProto.class, this);
+
+ this.configurationSerializer = configurationSerializer;
+ }
+
+ /**
+ * This method is called when a resource is requested. It will add a task to the existing Azure Batch job which
+ * is equivalent to requesting a container in Azure Batch. When the request is fulfilled and the evaluator shim is
+ * started, it will send a message back to the driver which signals that a resource request was fulfilled.
+ *
+ * @param resourceRequestEvent resource request event.
+ * @param containerId container id for the resource. It will be used as the task id for Azure Batch task.
+ * @param jarFileUri Azure Storage SAS URI of the JAR file containing libraries required by the evaluator shim.
+ */
+ public void onResourceRequested(final ResourceRequestEvent resourceRequestEvent,
+ final String containerId,
+ final URI jarFileUri) {
+ try {
+ createAzureBatchTask(containerId, jarFileUri);
+ this.outstandingResourceRequests.put(containerId, resourceRequestEvent);
+ this.outstandingResourceRequestCount.incrementAndGet();
+ this.updateRuntimeStatus();
+ } catch (IOException e) {
+ LOG.log(Level.SEVERE, "Failed to create Azure Batch task with the following exception: {0}", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * This method is invoked by the RemoteManager when a message from the evaluator shim is received.
+ *
+ * @param statusMessage the message from the evaluator shim indicating that the shim has started and is ready to
+ * start the evaluator process.
+ */
+ @Override
+ public void onNext(final RemoteMessage<EvaluatorShimProtocol.EvaluatorShimStatusProto> statusMessage) {
+
+ EvaluatorShimProtocol.EvaluatorShimStatusProto message = statusMessage.getMessage();
+ String containerId = message.getContainerId();
+ String remoteId = message.getRemoteIdentifier();
+
+ LOG.log(Level.INFO, "Got a status message from evaluator shim = {0} with containerId = {1} and status = {2}.",
+ new String[]{remoteId, containerId, message.getStatus().toString()});
+
+ if (message.getStatus() != EvaluatorShimProtocol.EvaluatorShimStatus.ONLINE) {
+ LOG.log(Level.SEVERE, "Unexpected status returned from the evaluator shim: {0}. Ignoring the message.",
+ message.getStatus().toString());
+ return;
+ }
+
+ this.onResourceAllocated(containerId, remoteId, Optional.<CloudTask>empty());
+ }
+
+ /**
+ * This method is invoked when the Azure Batch runtime is notified that a pending resource request has been
+ * fulfilled. It could happen because of two reasons:
+ * 1. The driver receives a message from the evaluator shim indicating it has successfully started.
+ * 2. {@link AzureBatchTaskStatusAlarmHandler} detects that the evaluator shim failed before sending the status
+ * message.
+ *
+ * @param containerId id of the container.
+ * @param remoteId remote address for the allocated container.
+ * @param cloudTask Azure Batch task which corresponds to the container.
+ */
+ public void onResourceAllocated(final String containerId,
+ final String remoteId,
+ final Optional<CloudTask> cloudTask) {
+ ResourceRequestEvent resourceRequestEvent = this.outstandingResourceRequests.remove(containerId);
+
+ if (resourceRequestEvent == null) {
+ LOG.log(Level.WARNING, "No outstanding resource request found for container id = {0}.", containerId);
+ } else {
+ this.outstandingResourceRequestCount.decrementAndGet();
+
+ // We would expect the Azure Batch task to be in 'RUNNING' state. If it is in
+ // 'COMPLETED' state, it cannot receiver instructions and thus by definition
+ // has failed.
+ if (cloudTask.isPresent() && TaskState.COMPLETED.equals(cloudTask.get().state())) {
+ this.failedResources.put(containerId, cloudTask.get());
+ }
+
+ LOG.log(Level.FINEST, "Notifying REEF of a new node: {0}", remoteId);
+ this.reefEventHandlers.onNodeDescriptor(NodeDescriptorEventImpl.newBuilder()
+ .setIdentifier(RemoteIdentifierParser.parseNodeId(remoteId))
+ .setHostName(RemoteIdentifierParser.parseIp(remoteId))
+ .setPort(RemoteIdentifierParser.parsePort(remoteId))
+ .setMemorySize(resourceRequestEvent.getMemorySize().get())
+ .build());
+
+ LOG.log(Level.FINEST, "Triggering a new ResourceAllocationEvent for remoteId = {0}.", remoteId);
+ this.reefEventHandlers.onResourceAllocation(
+ ResourceEventImpl.newAllocationBuilder()
+ .setIdentifier(containerId)
+ .setNodeId(RemoteIdentifierParser.parseNodeId(remoteId))
+ .setResourceMemory(resourceRequestEvent.getMemorySize().get())
+ .setVirtualCores(resourceRequestEvent.getVirtualCores().get())
+ .setRuntimeName(RuntimeIdentifier.RUNTIME_NAME)
+ .build());
+ }
+
+ this.updateRuntimeStatus();
+ }
+
+ /**
+ * Event handler method for {@link ResourceLaunchEvent}. This method will determine if the evaluator shim
+ * is online and send the evaluator launch command to the shim to start the evaluator.
+ *
+ * @param resourceLaunchEvent an instance of {@ResourceLaunchEvent}
+ * @param command OS command to launch the evaluator process.
+ * @param evaluatorConfigurationString evaluator configuration serialized as a String.
+ */
+ public void onResourceLaunched(final ResourceLaunchEvent resourceLaunchEvent,
+ final String command,
+ final String evaluatorConfigurationString) {
+ final String resourceId = resourceLaunchEvent.getIdentifier();
+
+ if (this.failedResources.containsKey(resourceId)) {
+ LOG.log(Level.FINE, "ResourceLaunch event triggered on a failed container. " +
+ "Notifying REEF of failed container.");
+ CloudTask cloudTask = this.failedResources.get(resourceId);
+ this.onAzureBatchTaskStatus(cloudTask);
+ } else if (this.evaluators.get(resourceId).isPresent()) {
+ LOG.log(Level.FINE, "Preparing to launch resourceId = {0}", resourceId);
+ this.launchEvaluator(resourceLaunchEvent, command, evaluatorConfigurationString);
+ } else {
+ LOG.log(Level.WARNING, "Received a ResourceLaunch event for an unknown resourceId = {0}", resourceId);
+ }
+
+ this.updateRuntimeStatus();
+ }
+
+ /**
+ * Event handler method for {@link ResourceReleaseEvent}. Sends a TERMINATE command to the appropriate evaluator shim.
+ *
+ * @param resourceReleaseEvent
+ */
+ public void onResourceReleased(final ResourceReleaseEvent resourceReleaseEvent) {
+ String resourceRemoteId = getResourceRemoteId(resourceReleaseEvent.getIdentifier());
+
+ // REEF Common will trigger a ResourceReleaseEvent even if the resource has failed. Since we know that the shim
+ // has already failed, we can safely ignore this.
+ if (this.failedResources.remove(resourceReleaseEvent.getIdentifier()) != null) {
+ LOG.log(Level.INFO, "Received a ResourceReleaseEvent for a failed shim with resourceId = {0}. Ignoring.",
+ resourceReleaseEvent.getIdentifier());
+ } else if (this.evaluators.get(resourceReleaseEvent.getIdentifier()).isPresent()) {
+ EventHandler<EvaluatorShimProtocol.EvaluatorShimControlProto> handler =
+ this.remoteManager.getHandler(resourceRemoteId, EvaluatorShimProtocol.EvaluatorShimControlProto.class);
+
+ LOG.log(Level.INFO, "Sending TERMINATE command to the shim with remoteId = {0}.", resourceRemoteId);
+ handler.onNext(
+ EvaluatorShimProtocol.EvaluatorShimControlProto
+ .newBuilder()
+ .setCommand(EvaluatorShimProtocol.EvaluatorShimCommand.TERMINATE)
+ .build());
+
+ this.updateRuntimeStatus();
+ }
+ }
+
+ /**
+ * Takes in an instance of {@link CloudTask}, generates and triggers a
+ * {@link org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent}.
+ *
+ * @param cloudTask and instance of {@link CloudTask}.
+ */
+ public void onAzureBatchTaskStatus(final CloudTask cloudTask) {
+ ResourceStatusEventImpl.Builder eventBuilder =
+ ResourceStatusEventImpl.newBuilder()
+ .setIdentifier(cloudTask.id())
+ .setState(TaskStatusMapper.getReefTaskState(cloudTask))
+ .setRuntimeName(RuntimeIdentifier.RUNTIME_NAME);
+
+ if (TaskState.COMPLETED.equals(cloudTask.state())) {
+ eventBuilder.setExitCode(cloudTask.executionInfo().exitCode());
+ }
+
+ this.reefEventHandlers.onResourceStatus(eventBuilder.build());
+ }
+
+ /**
+ * Closes the evaluator shim remote manager command channel.
+ */
+ public void onClose() {
+ try {
+ this.evaluatorShimCommandChannel.close();
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "An unexpected exception while closing the Evaluator Shim Command channel: {0}", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * A utility method which builds the evaluator shim JAR file and uploads it to Azure Storage.
+ *
+ * @return SAS URI to where the evaluator shim JAR was uploaded.
+ */
+ public URI generateShimJarFile() {
+
+ try {
+ Set<FileResource> globalFiles = new HashSet<>();
+
+ final File globalFolder = new File(this.reefFileNames.getGlobalFolderPath());
+ final File[] filesInGlobalFolder = globalFolder.listFiles();
+
+ for (final File fileEntry : filesInGlobalFolder != null ? filesInGlobalFolder : new File[]{}) {
+ globalFiles.add(getFileResourceFromFile(fileEntry, FileType.LIB));
+ }
+
+ File jarFile = this.jobJarMaker.newBuilder()
+ .addGlobalFileSet(globalFiles)
+ .build();
+
+ return uploadFile(jarFile);
+ } catch (IOException ex) {
+ LOG.log(Level.SEVERE, "Failed to build JAR file", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private void updateRuntimeStatus() {
+ this.reefEventHandlers.onRuntimeStatus(RuntimeStatusEventImpl.newBuilder()
+ .setName(RUNTIME_NAME)
+ .setState(State.RUNNING)
+ .setOutstandingContainerRequests(this.outstandingResourceRequestCount.get())
+ .build());
+ }
+
+ private void launchEvaluator(final ResourceLaunchEvent resourceLaunchEvent,
+ final String command,
+ final String evaluatorConfigurationString) {
+ String resourceId = resourceLaunchEvent.getIdentifier();
+ String resourceRemoteId = getResourceRemoteId(resourceId);
+
+ Set<FileResource> fileResources = resourceLaunchEvent.getFileSet();
+ String fileUrl = "";
+ if (!fileResources.isEmpty()) {
+ try {
+ File jarFile = writeFileResourcesJarFile(fileResources);
+ fileUrl = uploadFile(jarFile).toString();
+ LOG.log(Level.FINE, "Uploaded evaluator file resources to Azure Storage at {0}.", fileUrl);
+ } catch (IOException e) {
+ LOG.log(Level.SEVERE, "Failed to generate zip archive for evaluator file resources: {0}.", e);
+ throw new RuntimeException(e);
+ }
+ } else {
+ LOG.log(Level.INFO, "No file resources found in ResourceLaunchEvent.");
+ }
+
+ LOG.log(Level.INFO, "Sending a command to the Evaluator shim with remoteId = {0} to start the evaluator.",
+ resourceRemoteId);
+ EventHandler<EvaluatorShimProtocol.EvaluatorShimControlProto> handler = this.remoteManager
+ .getHandler(resourceRemoteId, EvaluatorShimProtocol.EvaluatorShimControlProto.class);
+
+ handler.onNext(
+ EvaluatorShimProtocol.EvaluatorShimControlProto
+ .newBuilder()
+ .setCommand(EvaluatorShimProtocol.EvaluatorShimCommand.LAUNCH_EVALUATOR)
+ .setEvaluatorLaunchCommand(command)
+ .setEvaluatorConfigString(evaluatorConfigurationString)
+ .setEvaluatorFileResourcesUrl(fileUrl)
+ .build());
+ }
+
+ private String getEvaluatorShimLaunchCommand() {
+ return this.launchCommandBuilder.buildEvaluatorShimCommand(EVALUATOR_SHIM_MEMORY_MB,
+ this.azureBatchFileNames.getEvaluatorShimConfigurationPath());
+ }
+
+ /**
+ * @return The name under which the evaluator shim configuration will be stored in
+ * REEF_BASE_FOLDER/LOCAL_FOLDER.
+ */
+ private FileResource getFileResourceFromFile(final File configFile, final FileType type) {
+ return FileResourceImpl.newBuilder()
+ .setName(configFile.getName())
+ .setPath(configFile.getPath())
+ .setType(type).build();
+ }
+
+ private void createAzureBatchTask(final String taskId, final URI jarFileUri) throws IOException {
+ final Configuration shimConfig = this.evaluatorShimConfigurationProvider.getConfiguration(taskId);
+ final File shim = new File(this.reefFileNames.getLocalFolderPath(),
+ taskId + '-' + this.azureBatchFileNames.getEvaluatorShimConfigurationName());
+ this.configurationSerializer.toFile(shimConfig, shim);
+ final URI shimUri = this.uploadFile(shim);
+ this.azureBatchHelper.submitTask(this.azureBatchHelper.getAzureBatchJobId(), taskId, jarFileUri,
+ shimUri, getEvaluatorShimLaunchCommand());
+ }
+
+ private File writeFileResourcesJarFile(final Set<FileResource> fileResourceSet) throws IOException {
+ return this.jobJarMaker.newBuilder().addLocalFileSet(fileResourceSet).build();
+ }
+
+ private URI uploadFile(final File jarFile) throws IOException {
+ final String folderName = this.azureBatchFileNames.getStorageJobFolder(this.azureBatchHelper.getAzureBatchJobId());
+ LOG.log(Level.FINE, "Uploading {0} to {1}.", new Object[]{jarFile.getAbsolutePath(), folderName});
+
+ return this.azureStorageClient.uploadFile(folderName, jarFile);
+ }
+
+ private String getResourceRemoteId(final String resourceId) {
+ Optional<EvaluatorManager> optionalEvaluatorManager = this.evaluators.get(resourceId);
+
+ if (!optionalEvaluatorManager.isPresent()) {
+ LOG.log(Level.SEVERE, "Unknown evaluator with resourceId = {0}", resourceId);
+ throw new RuntimeException("Unknown evaluator with resourceId = " + resourceId);
+ }
+
+ NodeDescriptor nodeDescriptor = optionalEvaluatorManager.get().getEvaluatorDescriptor().getNodeDescriptor();
+ return (new SocketRemoteIdentifier(nodeDescriptor.getInetSocketAddress())).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceLaunchHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceLaunchHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceLaunchHandler.java
new file mode 100644
index 0000000..b642c4d
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceLaunchHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A {@link ResourceLaunchHandler} for Azure Batch.
+ */
+@Private
+@DriverSide
+public final class AzureBatchResourceLaunchHandler implements ResourceLaunchHandler {
+
+ private static final Logger LOG = Logger.getLogger(AzureBatchResourceLaunchHandler.class.getName());
+ private final AzureBatchResourceManager azureBatchResourceManager;
+
+ @Inject
+ AzureBatchResourceLaunchHandler(final AzureBatchResourceManager azureBatchResourceManager) {
+ this.azureBatchResourceManager = azureBatchResourceManager;
+ }
+
+ /**
+ * This method is called when a new resource is requested.
+ *
+ * @param resourceLaunchEvent resource launch event.
+ */
+ @Override
+ public void onNext(final ResourceLaunchEvent resourceLaunchEvent) {
+ LOG.log(Level.FINEST, "Got ResourceLaunchEvent in AzureBatchResourceLaunchHandler");
+ this.azureBatchResourceManager.onResourceLaunched(resourceLaunchEvent);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManager.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManager.java
new file mode 100644
index 0000000..8fbbf17
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManager.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Driver's view of all resources in Azure Batch pool.
+ */
+@Private
+@DriverSide
+public final class AzureBatchResourceManager {
+ private static final Logger LOG = Logger.getLogger(AzureBatchResourceManager.class.getName());
+
+ private final Map<String, ResourceRequestEvent> containerRequests;
+ private final AtomicInteger containerCount;
+
+ private final ConfigurationSerializer configurationSerializer;
+ private final CommandBuilder launchCommandBuilder;
+ private final AzureBatchEvaluatorShimManager evaluatorShimManager;
+ private final AzureBatchTaskStatusAlarmHandler azureBatchTaskStatusAlarmHandler;
+
+ private final double jvmHeapFactor;
+
+ @Inject
+ AzureBatchResourceManager(
+ final ConfigurationSerializer configurationSerializer,
+ final CommandBuilder launchCommandBuilder,
+ final AzureBatchEvaluatorShimManager evaluatorShimManager,
+ final AzureBatchTaskStatusAlarmHandler azureBatchTaskStatusAlarmHandler,
+ @Parameter(JVMHeapSlack.class) final double jvmHeapSlack) {
+ this.configurationSerializer = configurationSerializer;
+ this.evaluatorShimManager = evaluatorShimManager;
+ this.jvmHeapFactor = 1.0 - jvmHeapSlack;
+ this.launchCommandBuilder = launchCommandBuilder;
+ this.containerRequests = new ConcurrentHashMap<>();
+ this.containerCount = new AtomicInteger(0);
+ this.azureBatchTaskStatusAlarmHandler = azureBatchTaskStatusAlarmHandler;
+ }
+
+ /**
+ * This method is invoked when a {@link ResourceRequestEvent} is triggered.
+ *
+ * @param resourceRequestEvent the resource request event.
+ */
+ public void onResourceRequested(final ResourceRequestEvent resourceRequestEvent) {
+ LOG.log(Level.FINEST, "Got ResourceRequestEvent in AzureBatchResourceManager,");
+ URI jarFileUri = this.evaluatorShimManager.generateShimJarFile();
+ for (int r = 0; r < resourceRequestEvent.getResourceCount(); r++) {
+ final String containerId = generateContainerId();
+ LOG.log(Level.FINE, "containerId in AzureBatchResourceManager {0}", containerId);
+ this.containerRequests.put(containerId, resourceRequestEvent);
+ this.containerCount.incrementAndGet();
+ this.evaluatorShimManager.onResourceRequested(resourceRequestEvent, containerId, jarFileUri);
+ }
+
+ int currentContainerCount = this.containerCount.get();
+ if (currentContainerCount > 0) {
+ this.azureBatchTaskStatusAlarmHandler.enableAlarm();
+ }
+ }
+
+ /**
+ * This method is invoked when a {@link ResourceReleaseEvent} is triggered.
+ *
+ * @param resourceReleaseEvent the resource release event.
+ */
+ public void onResourceReleased(final ResourceReleaseEvent resourceReleaseEvent) {
+ String id = resourceReleaseEvent.getIdentifier();
+ LOG.log(Level.FINEST, "Got ResourceReleasedEvent for Id: {0} in AzureBatchResourceManager", id);
+
+ ResourceRequestEvent removedEvent = this.containerRequests.remove(id);
+ if (removedEvent == null) {
+ LOG.log(Level.WARNING,
+ "Ignoring attempt to remove non-existent containerRequest for Id: {0} in AzureBatchResourceManager", id);
+ } else {
+ int currentContainerCount = this.containerCount.decrementAndGet();
+ if (currentContainerCount <= 0) {
+ this.azureBatchTaskStatusAlarmHandler.disableAlarm();
+ }
+ }
+
+ this.evaluatorShimManager.onResourceReleased(resourceReleaseEvent);
+ }
+
+ /**
+ * This method is called when the {@link ResourceLaunchEvent} is triggered.
+ *
+ * @param resourceLaunchEvent the resource launch event.
+ */
+ public void onResourceLaunched(final ResourceLaunchEvent resourceLaunchEvent) {
+ String id = resourceLaunchEvent.getIdentifier();
+ LOG.log(Level.FINEST, "Got ResourceLaunchEvent for Id: {0} in AzureBatchResourceManager", id);
+ final int evaluatorMemory = this.containerRequests.get(id).getMemorySize().get();
+ String launchCommand = this.launchCommandBuilder.buildEvaluatorCommand(resourceLaunchEvent,
+ evaluatorMemory, this.jvmHeapFactor);
+ String evaluatorConfigurationString = this.configurationSerializer.toString(resourceLaunchEvent.getEvaluatorConf());
+ this.evaluatorShimManager.onResourceLaunched(resourceLaunchEvent, launchCommand, evaluatorConfigurationString);
+ }
+
+ private String generateContainerId() {
+ return UUID.randomUUID().toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStartHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStartHandler.java
new file mode 100644
index 0000000..e07e91d
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStartHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.api.ResourceManagerStartHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A {@link ResourceManagerStartHandler} for Azure Batch runtime.
+ */
+@Private
+@DriverSide
+public final class AzureBatchResourceManagerStartHandler implements ResourceManagerStartHandler {
+
+ private static final Logger LOG = Logger.getLogger(AzureBatchResourceManagerStartHandler.class.getName());
+
+ @Inject
+ AzureBatchResourceManagerStartHandler() {
+ }
+
+ @Override
+ public void onNext(final RuntimeStart runtimeStart) {
+ LOG.log(Level.FINE, "Azure batch runtime has been started...");
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStopHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStopHandler.java
new file mode 100644
index 0000000..93ef918
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStopHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.api.ResourceManagerStopHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A {@link ResourceManagerStopHandler} for Azure Batch runtime.
+ */
+@Private
+@DriverSide
+public final class AzureBatchResourceManagerStopHandler implements ResourceManagerStopHandler {
+
+ private static final Logger LOG = Logger.getLogger(AzureBatchResourceManagerStopHandler.class.getName());
+
+ private final AzureBatchEvaluatorShimManager azureBatchEvaluatorShimManager;
+
+ @Inject
+ AzureBatchResourceManagerStopHandler(final AzureBatchEvaluatorShimManager azureBatchEvaluatorShimManager) {
+ this.azureBatchEvaluatorShimManager = azureBatchEvaluatorShimManager;
+ }
+
+ @Override
+ public void onNext(final RuntimeStop runtimeStop) {
+ LOG.log(Level.FINE, "Azure batch runtime has been stopped...");
+ this.azureBatchEvaluatorShimManager.onClose();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceReleaseHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceReleaseHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceReleaseHandler.java
new file mode 100644
index 0000000..e3da49b
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceReleaseHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A {@link ResourceReleaseHandler} for Azure Batch runtime.
+ */
+@Private
+@DriverSide
+public final class AzureBatchResourceReleaseHandler implements ResourceReleaseHandler {
+
+ private static final Logger LOG = Logger.getLogger(AzureBatchResourceReleaseHandler.class.getName());
+
+ private final AzureBatchResourceManager azureBatchResourceManager;
+
+ @Inject
+ AzureBatchResourceReleaseHandler(final AzureBatchResourceManager azureBatchResourceManager) {
+ this.azureBatchResourceManager = azureBatchResourceManager;
+ }
+
+ @Override
+ public void onNext(final ResourceReleaseEvent resourceReleaseEvent) {
+ LOG.log(Level.FINEST, "Got ResourceReleaseEvent in AzureBatchResourceLaunchHandler");
+ this.azureBatchResourceManager.onResourceReleased(resourceReleaseEvent);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceRequestHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceRequestHandler.java
new file mode 100644
index 0000000..fb9c000
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceRequestHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A {@link ResourceRequestHandler} for Azure Batch runtime.
+ */
+@Private
+@DriverSide
+public final class AzureBatchResourceRequestHandler implements ResourceRequestHandler {
+
+ private static final Logger LOG = Logger.getLogger(AzureBatchResourceRequestHandler.class.getName());
+ private final AzureBatchResourceManager azureBatchResourceManager;
+
+ @Inject
+ AzureBatchResourceRequestHandler(
+ final AzureBatchResourceManager azureBatchResourceManager) {
+ this.azureBatchResourceManager = azureBatchResourceManager;
+ }
+
+ @Override
+ public void onNext(final ResourceRequestEvent resourceRequestEvent) {
+ LOG.log(Level.FINEST, "Got ResourceRequestEvent in AzureBatchResourceRequestHandler: memory = {0}, cores = {1}.",
+ new Object[]{resourceRequestEvent.getMemorySize(), resourceRequestEvent.getVirtualCores()});
+ this.azureBatchResourceManager.onResourceRequested(resourceRequestEvent);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchTaskStatusAlarmHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchTaskStatusAlarmHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchTaskStatusAlarmHandler.java
new file mode 100644
index 0000000..0afe07d
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchTaskStatusAlarmHandler.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import com.microsoft.azure.batch.protocol.models.CloudTask;
+import com.microsoft.azure.batch.protocol.models.TaskState;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.parameters.AzureBatchTaskStatusCheckPeriod;
+import org.apache.reef.runtime.azbatch.util.batch.AzureBatchHelper;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+
+import javax.inject.Inject;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Class that gets that status of the tasks from Azure Batch for the job that is currently in progress
+ * and notifies REEF of the status.
+ * Unlike YARN, Azure Batch does not support Resource Manager callbacks. Therefore, we must periodically call
+ * Azure Batch APIs to determine the status of tasks running inside our job.
+ */
+@Private
+@DriverSide
+final class AzureBatchTaskStatusAlarmHandler implements EventHandler<Alarm> {
+
+ /**
+ * A placeholder remote ID which is used for reporting failed containers when the
+ * failure occurs before Wake communication channel can be established and the real
+ * remote ID is unknown.
+ */
+ private static final String PLACEHOLDER_REMOTE_ID = "socket://0.0.0.0:0";
+
+ private final InjectionFuture<AzureBatchEvaluatorShimManager> evaluatorShimManager;
+ private final AzureBatchHelper azureBatchHelper;
+ private final int taskStatusCheckPeriod;
+ private boolean isAlarmEnabled;
+ private final Evaluators evaluators;
+ private final Clock clock;
+
+ private static final Logger LOG = Logger.getLogger(AzureBatchTaskStatusAlarmHandler.class.getName());
+
+ @Inject
+ private AzureBatchTaskStatusAlarmHandler(
+ final InjectionFuture<AzureBatchEvaluatorShimManager> evaluatorShimManager,
+ final AzureBatchHelper azureBatchHelper,
+ final Evaluators evaluators,
+ final Clock clock,
+ @Parameter(AzureBatchTaskStatusCheckPeriod.class) final int taskStatusCheckPeriod) {
+ this.evaluatorShimManager = evaluatorShimManager;
+ this.azureBatchHelper = azureBatchHelper;
+ this.evaluators = evaluators;
+ this.clock = clock;
+ this.taskStatusCheckPeriod = taskStatusCheckPeriod;
+ }
+
+ /**
+ * This method is periodically invoked by the Runtime Clock. It will call Azure Batch APIs to determine
+ * the status of tasks running inside the job and notify REEF of tasks statuses that correspond to running
+ * evaluators.
+ *
+ * @param alarm alarm object.
+ */
+ @Override
+ public void onNext(final Alarm alarm) {
+ String jobId = this.azureBatchHelper.getAzureBatchJobId();
+ List<CloudTask> allTasks = this.azureBatchHelper.getTaskStatusForJob(jobId);
+
+ // Report status if the task has an associated active container.
+ LOG.log(Level.FINER, "Found {0} tasks from job id {1}", new Object[]{allTasks.size(), jobId});
+ for (CloudTask task : allTasks) {
+ Optional<EvaluatorManager> optionalEvaluatorManager = this.evaluators.get(task.id());
+
+ if (!optionalEvaluatorManager.isPresent() && !TaskState.COMPLETED.equals(task.state())) {
+ // This usually means that the evaluator shim has started, but hasn't sent the status message
+ // back to the driver yet.
+ LOG.log(Level.FINE, "No Evaluator found for Azure Batch task id = {0}. Ignoring.", task.id());
+ } else if (!optionalEvaluatorManager.isPresent() && TaskState.COMPLETED.equals(task.state())) {
+ // This indicates that the evaluator shim exited prematurely. We inform REEF of resource allocation
+ // so it's possible to trigger an event signaling resource failure later.
+ LOG.log(Level.INFO, "Azure Batch task id = {0} is in 'COMPLETED' state, but it does not have " +
+ "an Evaluator associated with it. This indicates that the evaluator shim has failed before " +
+ "it could send a callback to the driver.", task.id());
+ this.evaluatorShimManager.get().onResourceAllocated(task.id(), PLACEHOLDER_REMOTE_ID, Optional.of(task));
+ } else if (optionalEvaluatorManager.get().isClosedOrClosing()) {
+ LOG.log(Level.FINE, "Evaluator id = {0} is closed. Ignoring.", task.id());
+ } else {
+ LOG.log(Level.FINE, "Reporting status for Task Id: {0} is [Azure Batch Status]:{1} ",
+ new Object[]{task.id(), task.state().toString()});
+ this.evaluatorShimManager.get().onAzureBatchTaskStatus(task);
+ }
+ }
+
+ synchronized (this) {
+ if (this.isAlarmEnabled()) {
+ this.scheduleAlarm();
+ }
+ }
+ }
+
+ /**
+ * Enable the period alarm to send status updates.
+ */
+ public synchronized void enableAlarm() {
+ if (!this.isAlarmEnabled) {
+ LOG.log(Level.FINE, "Enabling the alarm and scheduling it to fire in {0} ms.", this.taskStatusCheckPeriod);
+ this.isAlarmEnabled = true;
+ this.scheduleAlarm();
+ } else {
+ LOG.log(Level.FINE, "Alarm is already enabled.");
+ }
+ }
+
+ /**
+ * Disable the period alarm to send status updates.
+ */
+ public synchronized void disableAlarm() {
+ this.isAlarmEnabled = false;
+ }
+
+ private boolean isAlarmEnabled() {
+ return this.isAlarmEnabled;
+ }
+
+ private void scheduleAlarm() {
+ this.clock.scheduleAlarm(this.taskStatusCheckPeriod, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/REEFEventHandlers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/REEFEventHandlers.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/REEFEventHandlers.java
new file mode 100644
index 0000000..9db16ec
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/REEFEventHandlers.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.common.driver.resourcemanager.*;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Logger;
+
+/**
+ * Helper that represents the REEF layer to the Azure Batch runtime.
+ */
+@Private
+public final class REEFEventHandlers {
+ private final EventHandler<ResourceAllocationEvent> resourceAllocationHandler;
+ private final EventHandler<ResourceStatusEvent> resourceStatusHandler;
+ private final EventHandler<RuntimeStatusEvent> runtimeStatusHandler;
+ private final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler;
+ private static final Logger LOG = Logger.getLogger(REEFEventHandlers.class.getName());
+
+ @Inject
+ REEFEventHandlers(@Parameter(RuntimeParameters.NodeDescriptorHandler.class)
+ final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler,
+ @Parameter(RuntimeParameters.RuntimeStatusHandler.class)
+ final EventHandler<RuntimeStatusEvent> runtimeStatusProtoEventHandler,
+ @Parameter(RuntimeParameters.ResourceAllocationHandler.class)
+ final EventHandler<ResourceAllocationEvent> resourceAllocationHandler,
+ @Parameter(RuntimeParameters.ResourceStatusHandler.class)
+ final EventHandler<ResourceStatusEvent> resourceStatusHandler) {
+ this.resourceAllocationHandler = resourceAllocationHandler;
+ this.resourceStatusHandler = resourceStatusHandler;
+ this.runtimeStatusHandler = runtimeStatusProtoEventHandler;
+ this.nodeDescriptorEventHandler = nodeDescriptorEventHandler;
+ }
+
+ /**
+ * Inform reef of a node.
+ *
+ * @param nodeDescriptorEvent
+ */
+ void onNodeDescriptor(final NodeDescriptorEvent nodeDescriptorEvent) {
+ this.nodeDescriptorEventHandler.onNext(nodeDescriptorEvent);
+ }
+
+ /**
+ * Update REEF's view on the runtime status.
+ *
+ * @param runtimeStatusEvent
+ */
+ @Private
+ public void onRuntimeStatus(final RuntimeStatusEvent runtimeStatusEvent) {
+ this.runtimeStatusHandler.onNext(runtimeStatusEvent);
+ }
+
+ /**
+ * Inform REEF of a fresh resource allocation.
+ *
+ * @param resourceAllocationEvent
+ */
+ @Private
+ public void onResourceAllocation(final ResourceAllocationEvent resourceAllocationEvent) {
+ this.resourceAllocationHandler.onNext(resourceAllocationEvent);
+ }
+
+ /**
+ * Update REEF on a change to the status of a resource.
+ *
+ * @param resourceStatusEvent
+ */
+ void onResourceStatus(final ResourceStatusEvent resourceStatusEvent) {
+ this.resourceStatusHandler.onNext(resourceStatusEvent);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/RuntimeIdentifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/RuntimeIdentifier.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/RuntimeIdentifier.java
new file mode 100644
index 0000000..aa61db0
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/RuntimeIdentifier.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Runtime Identifier Implementation.
+ */
+@Private
+public final class RuntimeIdentifier {
+
+ /**
+ * Same value is defined on the C# side in the Org.Apache.REEF.Common.Runtime.RuntimeName.
+ */
+ public static final String RUNTIME_NAME = "AzBatch";
+
+ private RuntimeIdentifier() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/package-info.java
new file mode 100644
index 0000000..37addfe
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Driver-side implementation of the REEF runtime for Azure Batch.
+ */
+package org.apache.reef.runtime.azbatch.driver;
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShim.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShim.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShim.java
new file mode 100644
index 0000000..1579c26
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShim.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.evaluator;
+
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
+import com.microsoft.windowsazure.storage.blob.CloudBlockBlob;
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.EvaluatorShimProtocol;
+import org.apache.reef.runtime.azbatch.parameters.ContainerIdentifier;
+import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames;
+import org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+/**
+ * The evaluator shim acts as a wrapper process around the Evaluator. Azure Batch starts this process on the evaluator
+ * node at the time that the resource is allocated. Once started, the evaluator shim process will send a status
+ * message back to the the Driver which triggers a
+ * {@link org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent} on the Driver side.
+ * The evaluator shim will then wait for a command from the Driver to start the Evaluator process.
+ * Upon receiving the command, the shim will launch the evaluator process and wait for it to exit. After receiving
+ * a terminate command, the evaluator shim will exit thus releasing the resource and completing the Azure Batch task.
+ */
+@Private
+@EvaluatorSide
+public final class EvaluatorShim
+ implements EventHandler<RemoteMessage<EvaluatorShimProtocol.EvaluatorShimControlProto>> {
+ private static final Logger LOG = Logger.getLogger(EvaluatorShim.class.getName());
+
+ private final RemoteManager remoteManager;
+ private final REEFFileNames reefFileNames;
+ private final AzureBatchFileNames azureBatchFileNames;
+ private final ConfigurationSerializer configurationSerializer;
+
+ private final String driverRemoteId;
+ private final String containerId;
+
+ private final EventHandler<EvaluatorShimProtocol.EvaluatorShimStatusProto> evaluatorShimStatusChannel;
+ private final AutoCloseable evaluatorShimCommandChannel;
+
+ private final ExecutorService threadPool;
+
+ private Process evaluatorProcess;
+ private Integer evaluatorProcessExitValue;
+
+ @Inject
+ EvaluatorShim(final REEFFileNames reefFileNames,
+ final AzureBatchFileNames azureBatchFileNames,
+ final ConfigurationSerializer configurationSerializer,
+ final RemoteManager remoteManager,
+ @Parameter(DriverRemoteIdentifier.class) final String driverRemoteId,
+ @Parameter(ContainerIdentifier.class) final String containerId) {
+ this.reefFileNames = reefFileNames;
+ this.azureBatchFileNames = azureBatchFileNames;
+ this.configurationSerializer = configurationSerializer;
+
+ this.driverRemoteId = driverRemoteId;
+ this.containerId = containerId;
+
+ this.remoteManager = remoteManager;
+ this.evaluatorShimStatusChannel = this.remoteManager.getHandler(this.driverRemoteId,
+ EvaluatorShimProtocol.EvaluatorShimStatusProto.class);
+
+ this.evaluatorShimCommandChannel = this.remoteManager
+ .registerHandler(EvaluatorShimProtocol.EvaluatorShimControlProto.class, this);
+
+ this.threadPool = Executors.newCachedThreadPool();
+ }
+
+ /**
+ * Starts the {@link EvaluatorShim}.
+ */
+ public void run() {
+ LOG.log(Level.FINEST, "Entering EvaluatorShim.run().");
+ this.onStart();
+ }
+
+ /**
+ * Stops the {@link EvaluatorShim}.
+ */
+ public void stop() {
+ LOG.log(Level.FINEST, "Entering EvaluatorShim.stop().");
+ this.onStop();
+ }
+
+ /**
+ * This method is invoked by the Remote Manager when a command message from the Driver is received.
+ *
+ * @param remoteMessage the message sent to the evaluator shim by the Driver.
+ */
+ @Override
+ public void onNext(final RemoteMessage<EvaluatorShimProtocol.EvaluatorShimControlProto> remoteMessage) {
+ final EvaluatorShimProtocol.EvaluatorShimCommand command = remoteMessage.getMessage().getCommand();
+ switch (command) {
+ case LAUNCH_EVALUATOR:
+ LOG.log(Level.INFO, "Received a command to launch the Evaluator.");
+ this.threadPool.submit(new Runnable() {
+ @Override
+ public void run() {
+ EvaluatorShim.this.onEvaluatorLaunch(remoteMessage.getMessage().getEvaluatorLaunchCommand(),
+ remoteMessage.getMessage().getEvaluatorConfigString(),
+ remoteMessage.getMessage().getEvaluatorFileResourcesUrl());
+ }
+ });
+ break;
+
+ case TERMINATE:
+ LOG.log(Level.INFO, "Received a command to terminate the EvaluatorShim.");
+ this.threadPool.submit(new Runnable() {
+ @Override
+ public void run() {
+ EvaluatorShim.this.onStop();
+ }
+ });
+ break;
+
+ default:
+ LOG.log(Level.WARNING, "An unknown command was received by the EvaluatorShim: {0}.", command);
+ throw new IllegalArgumentException("An unknown command was received by the EvaluatorShim.");
+ }
+ }
+
+ private void onStart() {
+ LOG.log(Level.FINEST, "Entering EvaluatorShim.onStart().");
+
+ LOG.log(Level.INFO, "Reporting back to the driver with Shim Status = {0}",
+ EvaluatorShimProtocol.EvaluatorShimStatus.ONLINE);
+ this.evaluatorShimStatusChannel.onNext(
+ EvaluatorShimProtocol.EvaluatorShimStatusProto
+ .newBuilder()
+ .setRemoteIdentifier(this.remoteManager.getMyIdentifier())
+ .setContainerId(this.containerId)
+ .setStatus(EvaluatorShimProtocol.EvaluatorShimStatus.ONLINE)
+ .build());
+
+ LOG.log(Level.FINEST, "Exiting EvaluatorShim.onStart().");
+ }
+
+ private void onStop() {
+ LOG.log(Level.FINEST, "Entering EvaluatorShim.onStop().");
+
+ try {
+ LOG.log(Level.INFO, "Closing EvaluatorShim Control channel.");
+ this.evaluatorShimCommandChannel.close();
+ } catch (Exception e) {
+ LOG.log(Level.SEVERE, "An unexpected exception occurred while attempting to close the EvaluatorShim " +
+ "control channel.");
+ throw new RuntimeException(e);
+ }
+
+ try {
+ LOG.log(Level.INFO, "Closing the Remote Manager.");
+ this.remoteManager.close();
+ } catch (Exception e) {
+ LOG.log(Level.SEVERE, "Failed to close the RemoteManager with the following exception: {0}.", e);
+ throw new RuntimeException(e);
+ }
+
+ LOG.log(Level.INFO, "Shutting down the thread pool.");
+ this.threadPool.shutdown();
+
+ LOG.log(Level.FINEST, "Exiting EvaluatorShim.onStop().");
+ }
+
+ private void onEvaluatorLaunch(final String launchCommand, final String evaluatorConfigString,
+ final String fileResourcesUrl) {
+ LOG.log(Level.FINEST, "Entering EvaluatorShim.onEvaluatorLaunch().");
+
+ if (StringUtils.isNotBlank(fileResourcesUrl)) {
+ LOG.log(Level.FINER, "Downloading evaluator resource file archive from {0}.", fileResourcesUrl);
+ try {
+ File tmpFile = downloadFile(fileResourcesUrl);
+ extractFiles(tmpFile);
+ } catch (StorageException | IOException e) {
+ LOG.log(Level.SEVERE, "Failed to download evaluator file resources: {0}. {1}",
+ new Object[]{fileResourcesUrl, e});
+ throw new RuntimeException(e);
+ }
+ } else {
+ LOG.log(Level.FINER, "No file resources URL given.");
+ }
+
+ File evaluatorConfigurationFile = new File(this.reefFileNames.getEvaluatorConfigurationPath());
+ LOG.log(Level.FINER, "Persisting evaluator config at: {0}", evaluatorConfigurationFile.getAbsolutePath());
+
+ try {
+ boolean newFileCreated = evaluatorConfigurationFile.createNewFile();
+ LOG.log(Level.FINEST,
+ newFileCreated ? "Created a new file for persisting evaluator configuration at {0}."
+ : "Using existing file for persisting evaluator configuration at {0}.",
+ evaluatorConfigurationFile.getAbsolutePath());
+
+ Configuration evaluatorConfiguration = this.configurationSerializer.fromString(evaluatorConfigString);
+ this.configurationSerializer.toFile(evaluatorConfiguration, evaluatorConfigurationFile);
+ } catch (final IOException | BindException e) {
+ LOG.log(Level.SEVERE, "An unexpected exception occurred while attempting to deserialize and write " +
+ "Evaluator configuration file. {0}", e);
+ throw new RuntimeException("Unable to write configuration.", e);
+ }
+
+ LOG.log(Level.INFO, "Launching the evaluator by invoking the following command: " + launchCommand);
+
+ try {
+ final List<String> command = Arrays.asList(launchCommand.split(" "));
+ this.evaluatorProcess = new ProcessBuilder()
+ .command(command)
+ .redirectError(new File(this.azureBatchFileNames.getEvaluatorStdErrFilename()))
+ .redirectOutput(new File(this.azureBatchFileNames.getEvaluatorStdOutFilename()))
+ .start();
+
+ // This will block the current thread until the Evaluator process completes.
+ this.evaluatorProcessExitValue = EvaluatorShim.this.evaluatorProcess.waitFor();
+ LOG.log(Level.INFO, "Evaluator process completed with exit value: {0}.", this.evaluatorProcessExitValue);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ LOG.log(Level.FINEST, "Exiting EvaluatorShim.onEvaluatorLaunch().");
+ }
+
+ private File downloadFile(final String url) throws IOException, StorageException {
+ URI fileUri = URI.create(url);
+ File downloadedFile = new File(this.azureBatchFileNames.getEvaluatorResourceFilesJarName());
+ LOG.log(Level.FINE, "Downloading evaluator file resources to {0}.", downloadedFile.getAbsolutePath());
+
+ try (FileOutputStream fileStream = new FileOutputStream(downloadedFile)) {
+ CloudBlob blob = new CloudBlockBlob(fileUri);
+ blob.download(fileStream);
+ }
+
+ return downloadedFile;
+ }
+
+ private void extractFiles(final File zipFile) throws IOException {
+ try (ZipFile zipFileHandle = new ZipFile(zipFile)) {
+ Enumeration<? extends ZipEntry> zipEntries = zipFileHandle.entries();
+ while (zipEntries.hasMoreElements()) {
+ ZipEntry zipEntry = zipEntries.nextElement();
+ File file = new File(this.reefFileNames.getREEFFolderName() + '/' + zipEntry.getName());
+ if (file.exists()) {
+ LOG.log(Level.INFO, "Skipping entry {0} because the file already exists.", zipEntry.getName());
+ } else {
+ if (zipEntry.isDirectory()) {
+ if (file.mkdirs()) {
+ LOG.log(Level.INFO, "Creating directory {0}.", zipEntry.getName());
+ } else {
+ LOG.log(Level.INFO, "Directory {0} already exists. Ignoring.", zipEntry.getName());
+ }
+ } else {
+ try (InputStream inputStream = zipFileHandle.getInputStream(zipEntry)) {
+ LOG.log(Level.INFO, "Extracting {0}.", zipEntry.getName());
+ Files.copy(inputStream, Paths.get(this.reefFileNames.getREEFFolderName() + '/' + zipEntry.getName()));
+ LOG.log(Level.INFO, "Extracting {0} completed.", zipEntry.getName());
+ }
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java
new file mode 100644
index 0000000..c114ba2
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.evaluator;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.parameters.ContainerIdentifier;
+import org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier;
+import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+
+/**
+ * ConfigurationModule to create evaluator shim configurations.
+ */
+@Private
+@EvaluatorSide
+public final class EvaluatorShimConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * @see org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier
+ */
+ public static final RequiredParameter<String> DRIVER_REMOTE_IDENTIFIER = new RequiredParameter<>();
+
+ /**
+ * @see org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier
+ */
+ public static final RequiredParameter<String> CONTAINER_IDENTIFIER = new RequiredParameter<>();
+
+ public static final ConfigurationModule CONF = new EvaluatorShimConfiguration()
+ .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+ .bindNamedParameter(DriverRemoteIdentifier.class, DRIVER_REMOTE_IDENTIFIER)
+ .bindNamedParameter(ContainerIdentifier.class, CONTAINER_IDENTIFIER)
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimLauncher.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimLauncher.java
new file mode 100644
index 0000000..bfc5c1d
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimLauncher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.evaluator;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.parameters.EvaluatorShimConfigFilePath;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The main entry point into the {@link EvaluatorShim}.
+ */
+@Private
+@EvaluatorSide
+public final class EvaluatorShimLauncher {
+
+ private static final Logger LOG = Logger.getLogger(EvaluatorShimLauncher.class.getName());
+
+ private final String configurationFilePath;
+ private final ConfigurationSerializer configurationSerializer;
+
+ @Inject
+ EvaluatorShimLauncher(
+ @Parameter(EvaluatorShimConfigFilePath.class) final String configurationFilePath,
+ final ConfigurationSerializer configurationSerializer) {
+ this.configurationFilePath = configurationFilePath;
+ this.configurationSerializer = configurationSerializer;
+ }
+
+ /**
+ * Launch the {@link EvaluatorShim}.
+ * @throws Exception
+ */
+ public void launch() throws Exception {
+ final Injector injector = Tang.Factory.getTang().newInjector(readConfigurationFromDisk(this.configurationFilePath));
+ final EvaluatorShim evaluatorShim = injector.getInstance(EvaluatorShim.class);
+ evaluatorShim.run();
+ }
+
+ private Configuration readConfigurationFromDisk(final String configPath) {
+
+ LOG.log(Level.FINER, "Loading configuration file: {0}", configPath);
+
+ final File shimConfigurationFile = new File(configPath);
+
+ if (!shimConfigurationFile.exists() || !shimConfigurationFile.canRead()) {
+ throw new RuntimeException(
+ "Configuration file " + configPath + " doesn't exist or is not readable.",
+ new IOException(configPath));
+ }
+
+ try {
+ final Configuration config = this.configurationSerializer.fromFile(shimConfigurationFile);
+ LOG.log(Level.FINEST, "Configuration file loaded: {0}", configPath);
+ return config;
+ } catch (final IOException e) {
+ throw new RuntimeException("Unable to parse the configuration file: " + configPath, e);
+ }
+ }
+
+ private static Configuration parseCommandLine(final String[] args) {
+ if (args.length != 1) {
+ throw new RuntimeException("Expected configuration file name as an argument.");
+ }
+
+ final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+ confBuilder.bindNamedParameter(EvaluatorShimConfigFilePath.class, args[0]);
+
+ return confBuilder.build();
+ }
+
+ /**
+ * The starting point of the evaluator shim launcher.
+ */
+ public static void main(final String[] args) throws Exception {
+ LOG.log(Level.INFO, "Entering EvaluatorShimLauncher.main().");
+
+ final Injector injector = Tang.Factory.getTang().newInjector(parseCommandLine(args));
+ final EvaluatorShimLauncher launcher = injector.getInstance(EvaluatorShimLauncher.class);
+ launcher.launch();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/package-info.java
new file mode 100644
index 0000000..cfa7b1b
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Client for the REEF runtime for Azure Batch.
+ */
+package org.apache.reef.runtime.azbatch.evaluator;
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/package-info.java
new file mode 100644
index 0000000..e10dbd6
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * A REEF runtime for Azure Batch.
+ */
+package org.apache.reef.runtime.azbatch;
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountKey.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountKey.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountKey.java
new file mode 100644
index 0000000..2efe842
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountKey.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The Azure Batch account key.
+ */
+@NamedParameter(doc = "The Azure Batch account key.")
+public final class AzureBatchAccountKey implements Name<String> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountName.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountName.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountName.java
new file mode 100644
index 0000000..09f49d7
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountName.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The Azure Batch account name.
+ */
+@NamedParameter(doc = "The Azure Batch account name.")
+public final class AzureBatchAccountName implements Name<String> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountUri.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountUri.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountUri.java
new file mode 100644
index 0000000..1f2ec54
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountUri.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Name;
+
+/**
+ * The Azure Batch account URI.
+ */
+@NamedParameter(doc = "The Azure Batch account URI.")
+public final class AzureBatchAccountUri implements Name<String> {
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchPoolId.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchPoolId.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchPoolId.java
new file mode 100644
index 0000000..88137f8
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchPoolId.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The Azure Batch pool ID.
+ */
+@NamedParameter(doc = "The Azure Batch pool ID.")
+public final class AzureBatchPoolId implements Name<String> {
+}