You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:18 UTC
[45/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java
new file mode 100644
index 0000000..bccf091
--- /dev/null
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.checkpoint.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.io.checkpoint.CheckpointID;
+import org.apache.reef.io.checkpoint.CheckpointNamingService;
+import org.apache.reef.io.checkpoint.CheckpointService;
+import org.apache.reef.io.checkpoint.RandomNameCNS;
+import org.apache.reef.tang.ExternalConstructor;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.RequiredParameter;
+
+import javax.inject.Inject;
+import java.io.IOException;
+
+/**
+ * ConfigurationModule for the FSCheckPointService.
+ * This can be used to create Evaluator-side configurations of the checkpointing service.
+ */
+@DriverSide
+@Public
+public class FSCheckPointServiceConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * Use local file system if true; otherwise, use HDFS.
+ */
+ public static final RequiredParameter<Boolean> IS_LOCAL = new RequiredParameter<>();
+
+ /**
+ * Path to be used to store the checkpoints on file system.
+ */
+ public static final RequiredParameter<String> PATH = new RequiredParameter<>();
+
+ /**
+ * Replication factor to be used for the checkpoints.
+ */
+ public static final OptionalParameter<Short> REPLICATION_FACTOR = new OptionalParameter<>();
+
+ /**
+ * Prefix for checkpoint files (optional).
+ */
+ public static final OptionalParameter<String> PREFIX = new OptionalParameter<>();
+ public static final ConfigurationModule CONF = new FSCheckPointServiceConfiguration()
+ .bindImplementation(CheckpointService.class, FSCheckpointService.class) // Use the HDFS based ccheckpoints
+ .bindImplementation(CheckpointNamingService.class, RandomNameCNS.class) // Use Random Names for the checkpoints
+ .bindImplementation(CheckpointID.class, FSCheckpointID.class)
+ .bindConstructor(FileSystem.class, FileSystemConstructor.class)
+ .bindNamedParameter(FileSystemConstructor.IS_LOCAL.class, IS_LOCAL)
+ .bindNamedParameter(FSCheckpointService.PATH.class, PATH)
+ .bindNamedParameter(FSCheckpointService.REPLICATION_FACTOR.class, REPLICATION_FACTOR)
+ .bindNamedParameter(RandomNameCNS.PREFIX.class, PREFIX)
+ .build();
+
+ /**
+ * Constructor for Hadoop FileSystem instances.
+ * This assumes that Hadoop Configuration is in the CLASSPATH.
+ */
+ public static class FileSystemConstructor implements ExternalConstructor<FileSystem> {
+
+ /**
+ * If false, use default values for Hadoop configuration; otherwise, load from config file.
+ * Set to false when REEF is running in local mode.
+ */
+ private final boolean loadConfig;
+
+ @Inject
+ public FileSystemConstructor(final @Parameter(IS_LOCAL.class) boolean isLocal) {
+ this.loadConfig = !isLocal;
+ }
+
+ @Override
+ public FileSystem newInstance() {
+ try {
+ return FileSystem.get(new Configuration(this.loadConfig));
+ } catch (final IOException ex) {
+ throw new RuntimeException("Unable to create a FileSystem instance." +
+ " Probably Hadoop configuration is not in the CLASSPATH", ex);
+ }
+ }
+
+ @NamedParameter(doc = "Use local file system if true; otherwise, use HDFS.")
+ static class IS_LOCAL implements Name<Boolean> {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java
new file mode 100644
index 0000000..784e026
--- /dev/null
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.checkpoint.fs;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.reef.io.checkpoint.CheckpointID;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A FileSystem based checkpoint ID contains reference to the Path
+ * where the checkpoint has been saved.
+ */
+public class FSCheckpointID implements CheckpointID {
+
+ private Path path;
+
+ public FSCheckpointID() {
+ }
+
+ public FSCheckpointID(Path path) {
+ this.path = path;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ public String toString() {
+ return path.toString();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, path.toString());
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.path = new Path(Text.readString(in));
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof FSCheckpointID
+ && path.equals(((FSCheckpointID) other).path);
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java
new file mode 100644
index 0000000..a3fabfb
--- /dev/null
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.checkpoint.fs;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.reef.io.checkpoint.CheckpointID;
+import org.apache.reef.io.checkpoint.CheckpointNamingService;
+import org.apache.reef.io.checkpoint.CheckpointService;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A FileSystem based CheckpointService.
+ */
+public class FSCheckpointService implements CheckpointService {
+
+ private final Path base;
+ private final FileSystem fs;
+ private final CheckpointNamingService namingPolicy;
+ private final short replication;
+
+ @Inject
+ FSCheckpointService(final FileSystem fs,
+ final @Parameter(PATH.class) String basePath,
+ final CheckpointNamingService namingPolicy,
+ final @Parameter(REPLICATION_FACTOR.class) short replication) {
+ this.fs = fs;
+ this.base = new Path(basePath);
+ this.namingPolicy = namingPolicy;
+ this.replication = replication;
+ }
+
+ public FSCheckpointService(final FileSystem fs,
+ final Path base,
+ final CheckpointNamingService namingPolicy,
+ final short replication) {
+ this.fs = fs;
+ this.base = base;
+ this.namingPolicy = namingPolicy;
+ this.replication = replication;
+ }
+
+ static final Path tmpfile(final Path p) {
+ return new Path(p.getParent(), p.getName() + ".tmp");
+ }
+
+ public CheckpointWriteChannel create()
+ throws IOException {
+
+ final String name = namingPolicy.getNewName();
+
+ final Path p = new Path(name);
+ if (p.isUriPathAbsolute()) {
+ throw new IOException("Checkpoint cannot be an absolute path");
+ }
+ return createInternal(new Path(base, p));
+ }
+
+ CheckpointWriteChannel createInternal(Path name) throws IOException {
+
+ //create a temp file, fail if file exists
+ return new FSCheckpointWriteChannel(name, fs.create(tmpfile(name), replication));
+ }
+
+ @Override
+ public CheckpointReadChannel open(final CheckpointID id)
+ throws IOException, InterruptedException {
+ if (!(id instanceof FSCheckpointID)) {
+ throw new IllegalArgumentException(
+ "Mismatched checkpoint type: " + id.getClass());
+ }
+ return new FSCheckpointReadChannel(
+ fs.open(((FSCheckpointID) id).getPath()));
+ }
+
+ @Override
+ public CheckpointID commit(final CheckpointWriteChannel ch) throws IOException,
+ InterruptedException {
+ if (ch.isOpen()) {
+ ch.close();
+ }
+ final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch;
+ final Path dst = hch.getDestination();
+ if (!fs.rename(tmpfile(dst), dst)) {
+ // attempt to clean up
+ abort(ch);
+ throw new IOException("Failed to promote checkpoint" +
+ tmpfile(dst) + " -> " + dst);
+ }
+ return new FSCheckpointID(hch.getDestination());
+ }
+
+ @Override
+ public void abort(final CheckpointWriteChannel ch) throws IOException {
+ if (ch.isOpen()) {
+ ch.close();
+ }
+ final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch;
+ final Path tmp = tmpfile(hch.getDestination());
+ try {
+ if (!fs.delete(tmp, false)) {
+ throw new IOException("Failed to delete checkpoint during abort");
+ }
+ } catch (FileNotFoundException e) {
+ // IGNORE
+ }
+ }
+
+ @Override
+ public boolean delete(final CheckpointID id) throws IOException,
+ InterruptedException {
+ if (!(id instanceof FSCheckpointID)) {
+ throw new IllegalArgumentException(
+ "Mismatched checkpoint type: " + id.getClass());
+ }
+ Path tmp = ((FSCheckpointID) id).getPath();
+ try {
+ return fs.delete(tmp, false);
+ } catch (FileNotFoundException e) {
+ // IGNORE
+ }
+ return true;
+ }
+
+ @NamedParameter(doc = "The path to be used to store the checkpoints.")
+ static class PATH implements Name<String> {
+ }
+
+ @NamedParameter(doc = "The replication factor to be used for the stored checkpoints", default_value = "3")
+ static class REPLICATION_FACTOR implements Name<Short> {
+ }
+
+ private static class FSCheckpointWriteChannel
+ implements CheckpointWriteChannel {
+ private final Path finalDst;
+ private final WritableByteChannel out;
+ private boolean isOpen = true;
+
+ FSCheckpointWriteChannel(final Path finalDst, final FSDataOutputStream out) {
+ this.finalDst = finalDst;
+ this.out = Channels.newChannel(out);
+ }
+
+ public int write(final ByteBuffer b) throws IOException {
+ return out.write(b);
+ }
+
+ public Path getDestination() {
+ return finalDst;
+ }
+
+ @Override
+ public void close() throws IOException {
+ isOpen = false;
+ out.close();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return isOpen;
+ }
+
+ }
+
+ private static class FSCheckpointReadChannel
+ implements CheckpointReadChannel {
+
+ private final ReadableByteChannel in;
+ private boolean isOpen = true;
+
+ FSCheckpointReadChannel(final FSDataInputStream in) {
+ this.in = Channels.newChannel(in);
+ }
+
+ @Override
+ public int read(final ByteBuffer bb) throws IOException {
+ return in.read(bb);
+ }
+
+ @Override
+ public void close() throws IOException {
+ isOpen = false;
+ in.close();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return isOpen;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/maven-eclipse.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/maven-eclipse.xml b/lang/java/reef-common/maven-eclipse.xml
new file mode 100644
index 0000000..6c6b5ae
--- /dev/null
+++ b/lang/java/reef-common/maven-eclipse.xml
@@ -0,0 +1,28 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project default="copy-resources">
+ <target name="init"/>
+ <target name="copy-resources" depends="init">
+ <copy todir="target/classes/META-INF/conf" filtering="false">
+ <fileset dir="src/main/conf" includes="*.xml|*.properties" excludes="**/*.java"/>
+ </copy>
+ </target>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/pom.xml b/lang/java/reef-common/pom.xml
new file mode 100644
index 0000000..c2d5ee0
--- /dev/null
+++ b/lang/java/reef-common/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.reef</groupId>
+ <artifactId>reef-project</artifactId>
+ <version>0.11.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>reef-common</artifactId>
+ <name>REEF Common</name>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>generate-sources</id>
+ <phase>generate-sources</phase>
+ <configuration>
+ <tasks>
+ <mkdir dir="target/generated-sources/proto"/>
+ <exec executable="protoc">
+ <arg value="--proto_path=src/main/proto/"/>
+ <arg value="--java_out=target/generated-sources/proto"/>
+ <arg value="src/main/proto/reef_service_protos.proto"/>
+ <arg value="src/main/proto/evaluator_runtime.proto"/>
+ <arg value="src/main/proto/client_runtime.proto"/>
+ <arg value="src/main/proto/driver_runtime.proto"/>
+ <arg value="src/main/proto/reef_protocol.proto"/>
+ </exec>
+ </tasks>
+ <sourceRoot>target/generated-sources/proto</sourceRoot>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/proto</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ <includes>
+ <include>version.properties</include>
+ </includes>
+ <filtering>true</filtering>
+ </resource>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ <excludes>
+ <exclude>version.properties</exclude>
+ </excludes>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-annotations</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-utils</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>wake</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tang</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>net.jcip</groupId>
+ <artifactId>jcip-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/conf/log4j.properties b/lang/java/reef-common/src/main/conf/log4j.properties
new file mode 100644
index 0000000..53d72e2
--- /dev/null
+++ b/lang/java/reef-common/src/main/conf/log4j.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#### Use two appenders, one to log to console, another to log to a file
+log4j.rootCategory=info, stdout, R
+
+#### First appender writes to console
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+
+# Pattern to output the caller's file name and line number.
+log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F%L) - %m%n
+
+#### Second appender writes to a file
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.File=D:\\log\\import.log
+
+# Control the maximum log file size
+log4j.appender.R.MaxFileSize=10000KB
+# Archive log files (one backup file here)
+log4j.appender.R.MaxBackupIndex=10
+
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/conf/reef-site.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/conf/reef-site.xml b/lang/java/reef-common/src/main/conf/reef-site.xml
new file mode 100644
index 0000000..334e913
--- /dev/null
+++ b/lang/java/reef-common/src/main/conf/reef-site.xml
@@ -0,0 +1,21 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+-->
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/ClientConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/ClientConfiguration.java
new file mode 100644
index 0000000..3bdb992
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/ClientConfiguration.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client;
+
+import org.apache.reef.client.parameters.*;
+import org.apache.reef.runtime.common.client.parameters.ClientPresent;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalImpl;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+
+/**
+ * A ConfigurationModule to fill out for the client configuration.
+ */
+public final class ClientConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * Event handler for messages from the running job.
+ * Default implementation just writes message to the log.
+ * A message contains a status and a client-defined message payload.
+ */
+ public static final OptionalImpl<EventHandler<JobMessage>> ON_JOB_MESSAGE = new OptionalImpl<>();
+
+ /**
+ * Handler for the event when a submitted REEF Job is running.
+ * Default implementation just writes to the log.
+ */
+ public static final OptionalImpl<EventHandler<RunningJob>> ON_JOB_RUNNING = new OptionalImpl<>();
+
+ /**
+ * Handler for the event when a submitted REEF Job is completed.
+ * Default implementation just writes to the log.
+ */
+ public static final OptionalImpl<EventHandler<CompletedJob>> ON_JOB_COMPLETED = new OptionalImpl<>();
+
+ /**
+ * Handler for the event when a submitted REEF Job has failed.
+ * Default implementation logs an error and rethrows the exception in the client JVM.
+ */
+ public static final OptionalImpl<EventHandler<FailedJob>> ON_JOB_FAILED = new OptionalImpl<>();
+
+ /**
+ * Receives fatal resourcemanager errors. The presence of this error means that the
+ * underlying REEF instance is no longer able to execute REEF jobs. The
+ * actual Jobs may or may not still be running.
+ * Default implementation logs an error and rethrows the exception in the client JVM.
+ */
+ public static final OptionalImpl<EventHandler<FailedRuntime>> ON_RUNTIME_ERROR = new OptionalImpl<>();
+
+ /**
+ * Error handler for events on Wake-spawned threads.
+ * Exceptions that are thrown on wake-spawned threads (e.g. in EventHandlers) will be caught by Wake and delivered to
+ * this handler. Default behavior is to log the exceptions and rethrow them as RuntimeExceptions.
+ */
+ public static final OptionalImpl<EventHandler<Throwable>> ON_WAKE_ERROR = new OptionalImpl<>();
+
+ public static final ConfigurationModule CONF = new ClientConfiguration()
+ .bind(JobMessageHandler.class, ON_JOB_MESSAGE)
+ .bind(JobRunningHandler.class, ON_JOB_RUNNING)
+ .bind(JobCompletedHandler.class, ON_JOB_COMPLETED)
+ .bind(JobFailedHandler.class, ON_JOB_FAILED)
+ .bind(ResourceManagerErrorHandler.class, ON_RUNTIME_ERROR)
+ .bindNamedParameter(ClientPresent.class, ClientPresent.YES)
+ .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, ON_WAKE_ERROR)
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/CompletedJob.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/CompletedJob.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/CompletedJob.java
new file mode 100644
index 0000000..5f763c2
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/CompletedJob.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.io.naming.Identifiable;
+
+/**
+ * Represents a completed REEF job.
+ */
+@Public
+@ClientSide
+@Provided
+public interface CompletedJob extends Identifiable {
+
+ /**
+ * @return the ID of the completed job.
+ */
+ @Override
+ public String getId();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
new file mode 100644
index 0000000..5573fed
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.runtime.common.DriverRestartCompleted;
+import org.apache.reef.runtime.common.driver.DriverRuntimeConfiguration;
+import org.apache.reef.tang.formats.*;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+/**
+ * A ConfigurationModule for Drivers.
+ */
+@ClientSide
+@Public
+@Provided
+public final class DriverConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * Identifies the driver and therefore the JOB. Expect to see this e.g. on YARN's dashboard.
+ */
+ public static final OptionalParameter<String> DRIVER_IDENTIFIER = new OptionalParameter<>();
+
+ /**
+ * The amount of memory to be allocated for the Driver. This is the size of the AM container in YARN.
+ */
+ public static final OptionalParameter<Integer> DRIVER_MEMORY = new OptionalParameter<>();
+
+ /**
+ * Files to be made available on the Driver and all Evaluators.
+ */
+ public static final OptionalParameter<String> GLOBAL_FILES = new OptionalParameter<>();
+
+ /**
+ * Libraries to be made available on the Driver and all Evaluators.
+ */
+ public static final OptionalParameter<String> GLOBAL_LIBRARIES = new OptionalParameter<>();
+
+ /**
+ * Files to be made available on the Driver only.
+ */
+ public static final OptionalParameter<String> LOCAL_FILES = new OptionalParameter<>();
+
+ /**
+ * Libraries to be made available on the Driver only.
+ */
+ public static final OptionalParameter<String> LOCAL_LIBRARIES = new OptionalParameter<>();
+
+ /**
+ * Job submission directory to be used by driver. This is the folder on the DFS used to stage the files
+ * for the Driver and subsequently for the Evaluators. It will be created if it doesn't exist yet.
+ * If this is set by the user, user must make sure its uniqueness across different jobs.
+ */
+ public static final OptionalParameter<String> DRIVER_JOB_SUBMISSION_DIRECTORY = new OptionalParameter<>();
+
+ /**
+ * The event handler invoked right after the driver boots up.
+ */
+ public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>();
+
+ /**
+ * This event is fired in place of the ON_DRIVER_STARTED when the Driver is in fact restarted after failure.
+ */
+ public static final OptionalImpl<EventHandler<StartTime>> ON_DRIVER_RESTARTED = new OptionalImpl<>();
+
+ /**
+ * The event handler invoked right before the driver shuts down. Defaults to ignore.
+ */
+ public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>();
+
+ // ***** EVALUATOR HANDLER BINDINGS:
+
+ /**
+ * Event handler for allocated evaluators. Defaults to returning the evaluator if not bound.
+ */
+ public static final OptionalImpl<EventHandler<AllocatedEvaluator>> ON_EVALUATOR_ALLOCATED = new OptionalImpl<>();
+
+ /**
+ * Event handler for completed evaluators. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<CompletedEvaluator>> ON_EVALUATOR_COMPLETED = new OptionalImpl<>();
+
+ /**
+ * Event handler for failed evaluators. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_EVALUATOR_FAILED = new OptionalImpl<>();
+
+ // ***** TASK HANDLER BINDINGS:
+
+ /**
+ * Event handler for task messages. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<TaskMessage>> ON_TASK_MESSAGE = new OptionalImpl<>();
+
+ /**
+ * Event handler for completed tasks. Defaults to closing the context the task ran on if not bound.
+ */
+ public static final OptionalImpl<EventHandler<CompletedTask>> ON_TASK_COMPLETED = new OptionalImpl<>();
+
+ /**
+ * Event handler for failed tasks. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedTask>> ON_TASK_FAILED = new OptionalImpl<>();
+
+ /**
+ * Event handler for running tasks. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>();
+
+ /**
+ * Event handler for running tasks in previous evaluator, when driver restarted. Defaults to crash if not bound.
+ */
+ public static final OptionalImpl<EventHandler<RunningTask>> ON_DRIVER_RESTART_TASK_RUNNING = new OptionalImpl<>();
+
+ /**
+ * Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support
+ * task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then.
+ */
+ public static final OptionalImpl<EventHandler<SuspendedTask>> ON_TASK_SUSPENDED = new OptionalImpl<>();
+
+ // ***** CLIENT HANDLER BINDINGS:
+
+ /**
+ * Event handler for client messages. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_MESSAGE = new OptionalImpl<>();
+
+ /**
+ * Event handler for close messages sent by the client. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<Void>> ON_CLIENT_CLOSED = new OptionalImpl<>();
+
+ /**
+ * Event handler for close messages sent by the client. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_CLOSED_MESSAGE = new OptionalImpl<>();
+
+ // ***** CONTEXT HANDLER BINDINGS:
+
+ /**
+ * Event handler for active context. Defaults to closing the context if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>();
+
+ /**
+ * Event handler for active context when driver restart. Defaults to closing the context if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ActiveContext>> ON_DRIVER_RESTART_CONTEXT_ACTIVE = new OptionalImpl<>();
+
+ /**
+ * Event handler for closed context. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>();
+
+ /**
+ * Event handler for closed context. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedContext>> ON_CONTEXT_FAILED = new OptionalImpl<>();
+
+ /**
+ * Event handler for context messages. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>();
+
+ /**
+ * "Number of threads allocated per evaluator to dispatch events from this Evaluator.
+ */
+ public static final OptionalParameter<Integer> EVALUATOR_DISPATCHER_THREADS = new OptionalParameter<>();
+
+ /**
+ * Event handler for the event of driver restart completion, default to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<DriverRestartCompleted>> ON_DRIVER_RESTART_COMPLETED = new OptionalImpl<>();
+
+ /**
+ * ConfigurationModule to fill out to get a legal Driver Configuration.
+ */
+ public static final ConfigurationModule CONF = new DriverConfiguration().merge(DriverRuntimeConfiguration.CONF)
+
+ .bindNamedParameter(DriverIdentifier.class, DRIVER_IDENTIFIER)
+ .bindNamedParameter(DriverMemory.class, DRIVER_MEMORY)
+ .bindNamedParameter(DriverJobSubmissionDirectory.class, DRIVER_JOB_SUBMISSION_DIRECTORY)
+ .bindSetEntry(JobGlobalFiles.class, GLOBAL_FILES)
+ .bindSetEntry(JobGlobalLibraries.class, GLOBAL_LIBRARIES)
+ .bindSetEntry(DriverLocalFiles.class, LOCAL_FILES)
+ .bindSetEntry(DriverLocalLibraries.class, LOCAL_LIBRARIES)
+
+ // Driver start/stop handlers
+ .bindSetEntry(DriverStartHandler.class, ON_DRIVER_STARTED)
+ .bindNamedParameter(DriverRestartHandler.class, ON_DRIVER_RESTARTED)
+ .bindSetEntry(Clock.StartHandler.class, org.apache.reef.runtime.common.driver.DriverStartHandler.class)
+ .bindSetEntry(Clock.StopHandler.class, ON_DRIVER_STOP)
+
+ // Evaluator handlers
+ .bindSetEntry(EvaluatorAllocatedHandlers.class, ON_EVALUATOR_ALLOCATED)
+ .bindSetEntry(EvaluatorCompletedHandlers.class, ON_EVALUATOR_COMPLETED)
+ .bindSetEntry(EvaluatorFailedHandlers.class, ON_EVALUATOR_FAILED)
+
+ // Task handlers
+ .bindSetEntry(TaskRunningHandlers.class, ON_TASK_RUNNING)
+ .bindSetEntry(DriverRestartTaskRunningHandlers.class, ON_DRIVER_RESTART_TASK_RUNNING)
+ .bindSetEntry(TaskFailedHandlers.class, ON_TASK_FAILED)
+ .bindSetEntry(TaskMessageHandlers.class, ON_TASK_MESSAGE)
+ .bindSetEntry(TaskCompletedHandlers.class, ON_TASK_COMPLETED)
+ .bindSetEntry(TaskSuspendedHandlers.class, ON_TASK_SUSPENDED)
+
+ // Context handlers
+ .bindSetEntry(ContextActiveHandlers.class, ON_CONTEXT_ACTIVE)
+ .bindSetEntry(DriverRestartContextActiveHandlers.class, ON_DRIVER_RESTART_CONTEXT_ACTIVE)
+ .bindSetEntry(ContextClosedHandlers.class, ON_CONTEXT_CLOSED)
+ .bindSetEntry(ContextMessageHandlers.class, ON_CONTEXT_MESSAGE)
+ .bindSetEntry(ContextFailedHandlers.class, ON_CONTEXT_FAILED)
+
+ // Client handlers
+ .bindSetEntry(ClientMessageHandlers.class, ON_CLIENT_MESSAGE)
+ .bindSetEntry(ClientCloseHandlers.class, ON_CLIENT_CLOSED)
+ .bindSetEntry(ClientCloseWithMessageHandlers.class, ON_CLIENT_CLOSED_MESSAGE)
+
+ // Various parameters
+ .bindNamedParameter(EvaluatorDispatcherThreads.class, EVALUATOR_DISPATCHER_THREADS)
+ .bindSetEntry(DriverRestartCompletedHandlers.class, ON_DRIVER_RESTART_COMPLETED)
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java
new file mode 100644
index 0000000..009fa24
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A launcher for REEF Drivers.
+ * <p/>
+ * It can be instantiated using a configuration that can create a REEF instance.
+ * For example, the local resourcemanager and the YARN resourcemanager can do this.
+ * <p/>
+ * {@see org.apache.reef.examples.hello.HelloREEF} for a demo use case.
+ */
+@Public
+@Provided
+@ClientSide
+@Unit
+public final class DriverLauncher {
+
+ private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName());
+ private final REEF reef;
+ private LauncherStatus status = LauncherStatus.INIT;
+ private RunningJob theJob = null;
+
+ @Inject
+ private DriverLauncher(final REEF reef) {
+ this.reef = reef;
+ }
+
+ /**
+ * Instantiate a launcher for the given Configuration.
+ *
+ * @param runtimeConfiguration the resourcemanager configuration to be used
+ * @return a DriverLauncher based on the given resourcemanager configuration
+ * @throws BindException on configuration errors
+ * @throws InjectionException on configuration errors
+ */
+ public static DriverLauncher getLauncher(
+ final Configuration runtimeConfiguration) throws BindException, InjectionException {
+
+ final Configuration clientConfiguration = ClientConfiguration.CONF
+ .set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class)
+ .set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class)
+ .set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class)
+ .set(ClientConfiguration.ON_RUNTIME_ERROR, RuntimeErrorHandler.class)
+ .build();
+
+ return Tang.Factory.getTang()
+ .newInjector(runtimeConfiguration, clientConfiguration)
+ .getInstance(DriverLauncher.class);
+ }
+
+ /**
+ * Kills the running job.
+ */
+ public synchronized void close() {
+ if (this.status.isRunning()) {
+ this.status = LauncherStatus.FORCE_CLOSED;
+ }
+ if (null != this.theJob) {
+ this.theJob.close();
+ }
+ this.notify();
+ }
+
+ /**
+ * Run a job. Waits indefinitely for the job to complete.
+ *
+ * @param driverConfig the configuration for the driver. See DriverConfiguration for details.
+ * @return the state of the job after execution.
+ */
+ public LauncherStatus run(final Configuration driverConfig) {
+ this.reef.submit(driverConfig);
+ synchronized (this) {
+ while (!this.status.isDone()) {
+ try {
+ LOG.log(Level.FINE, "Wait indefinitely");
+ this.wait();
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.FINE, "Interrupted: {0}", ex);
+ }
+ }
+ }
+ this.reef.close();
+ return this.status;
+ }
+
+ /**
+ * Run a job with a waiting timeout after which it will be killed, if it did not complete yet.
+ *
+ * @param driverConfig the configuration for the driver. See DriverConfiguration for details.
+ * @param timeOut timeout on the job.
+ * @return the state of the job after execution.
+ */
+ public LauncherStatus run(final Configuration driverConfig, final long timeOut) {
+ final long endTime = System.currentTimeMillis() + timeOut;
+ this.reef.submit(driverConfig);
+ synchronized (this) {
+ while (!this.status.isDone()) {
+ try {
+ final long waitTime = endTime - System.currentTimeMillis();
+ if (waitTime <= 0) {
+ break;
+ }
+ LOG.log(Level.FINE, "Wait for {0} milliSeconds", waitTime);
+ this.wait(waitTime);
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.FINE, "Interrupted: {0}", ex);
+ }
+ }
+ if (System.currentTimeMillis() >= endTime) {
+ LOG.log(Level.WARNING, "The Job timed out.");
+ this.status = LauncherStatus.FORCE_CLOSED;
+ }
+ }
+
+ this.reef.close();
+ return this.status;
+ }
+
+ /**
+ * @return the current status of the job.
+ */
+ public LauncherStatus getStatus() {
+ return this.status;
+ }
+
+ /**
+ * Update job status and notify the waiting thread.
+ */
+ public synchronized void setStatusAndNotify(final LauncherStatus status) {
+ LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[]{this.status, status});
+ this.status = status;
+ this.notify();
+ }
+
+ @Override
+ public String toString() {
+ return this.status.toString();
+ }
+
+ /**
+ * Job driver notifies us that the job is running.
+ */
+ public final class RunningJobHandler implements EventHandler<RunningJob> {
+ @Override
+ public void onNext(final RunningJob job) {
+ LOG.log(Level.INFO, "The Job {0} is running.", job.getId());
+ theJob = job;
+ setStatusAndNotify(LauncherStatus.RUNNING);
+ }
+ }
+
+ /**
+ * Job driver notifies us that the job had failed.
+ */
+ public final class FailedJobHandler implements EventHandler<FailedJob> {
+ @Override
+ public void onNext(final FailedJob job) {
+ final Optional<Throwable> ex = job.getReason();
+ LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), ex);
+ theJob = null;
+ setStatusAndNotify(LauncherStatus.FAILED(ex));
+ }
+ }
+
+ /**
+ * Job driver notifies us that the job had completed successfully.
+ */
+ public final class CompletedJobHandler implements EventHandler<CompletedJob> {
+ @Override
+ public void onNext(final CompletedJob job) {
+ LOG.log(Level.INFO, "The Job {0} is done.", job.getId());
+ theJob = null;
+ setStatusAndNotify(LauncherStatus.COMPLETED);
+ }
+ }
+
+ /**
+ * Handler an error in the job driver.
+ */
+ public final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
+ @Override
+ public void onNext(final FailedRuntime error) {
+ LOG.log(Level.SEVERE, "Received a resourcemanager error", error.getReason());
+ theJob = null;
+ setStatusAndNotify(LauncherStatus.FAILED(error.getReason()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
new file mode 100644
index 0000000..4cf452e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.tang.formats.*;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+/**
+ * Use this ConfigurationModule to configure Services to be run in the Driver.
+ * <p/>
+ * A service is a set of event handlers that are informed of events in addition to * the event handlers defined in
+ * DriverConfiguration. However, most services will treat the events as read-only. Doing differently should be
+ * documented clearly in the Service documentation.
+ */
+@ClientSide
+@Public
+@Provided
+public final class DriverServiceConfiguration extends ConfigurationModuleBuilder {
+
+
+ /**
+ * Files to be made available on the Driver and all Evaluators.
+ */
+ public static final OptionalParameter<String> GLOBAL_FILES = new OptionalParameter<>();
+
+ /**
+ * Libraries to be made available on the Driver and all Evaluators.
+ */
+ public static final OptionalParameter<String> GLOBAL_LIBRARIES = new OptionalParameter<>();
+
+ /**
+ * Files to be made available on the Driver only.
+ */
+ public static final OptionalParameter<String> LOCAL_FILES = new OptionalParameter<>();
+
+ /**
+ * Libraries to be made available on the Driver only.
+ */
+ public static final OptionalParameter<String> LOCAL_LIBRARIES = new OptionalParameter<>();
+
+ /**
+ * The event handler invoked right after the driver boots up.
+ */
+ public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>();
+
+ /**
+ * The event handler invoked right before the driver shuts down. Defaults to ignore.
+ */
+ public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>();
+
+ // ***** EVALUATOR HANDLER BINDINGS:
+
+ /**
+ * Event handler for allocated evaluators. Defaults to returning the evaluator if not bound.
+ */
+ public static final OptionalImpl<EventHandler<AllocatedEvaluator>> ON_EVALUATOR_ALLOCATED = new OptionalImpl<>();
+
+ /**
+ * Event handler for completed evaluators. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<CompletedEvaluator>> ON_EVALUATOR_COMPLETED = new OptionalImpl<>();
+
+ /**
+ * Event handler for failed evaluators. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_EVALUATOR_FAILED = new OptionalImpl<>();
+
+ // ***** TASK HANDLER BINDINGS:
+
+ /**
+ * Event handler for task messages. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<TaskMessage>> ON_TASK_MESSAGE = new OptionalImpl<>();
+
+ /**
+ * Event handler for completed tasks. Defaults to closing the context the task ran on if not bound.
+ */
+ public static final OptionalImpl<EventHandler<CompletedTask>> ON_TASK_COMPLETED = new OptionalImpl<>();
+
+ /**
+ * Event handler for failed tasks. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedTask>> ON_TASK_FAILED = new OptionalImpl<>();
+
+ /**
+ * Event handler for running tasks. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>();
+
+ /**
+ * Event handler for running tasks in previous evaluator, when driver restarted. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<RunningTask>> ON_DRIVER_RESTART_TASK_RUNNING = new OptionalImpl<>();
+
+ /**
+ * Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support
+ * task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then.
+ */
+ public static final OptionalImpl<EventHandler<SuspendedTask>> ON_TASK_SUSPENDED = new OptionalImpl<>();
+
+
+ // ***** CONTEXT HANDLER BINDINGS:
+
+ /**
+ * Event handler for active context. Defaults to closing the context if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>();
+
+ /**
+ * Event handler for active context when driver restart. Defaults to closing the context if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ActiveContext>> ON_DRIVER_RESTART_CONTEXT_ACTIVE = new OptionalImpl<>();
+
+ /**
+ * Event handler for closed context. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>();
+
+ /**
+ * Event handler for closed context. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedContext>> ON_CONTEXT_FAILED = new OptionalImpl<>();
+
+ /**
+ * Event handler for context messages. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>();
+
+
+ /**
+ * ConfigurationModule to fill out to get a legal Driver Configuration.
+ */
+ public static final ConfigurationModule CONF = new DriverServiceConfiguration()
+ // Files use the very same named parameters as the DriverConfiguration
+ .bindSetEntry(JobGlobalFiles.class, GLOBAL_FILES)
+ .bindSetEntry(JobGlobalLibraries.class, GLOBAL_LIBRARIES)
+ .bindSetEntry(DriverLocalFiles.class, LOCAL_FILES)
+ .bindSetEntry(DriverLocalLibraries.class, LOCAL_LIBRARIES)
+
+ // Start and stop events are the same handlers for applications and services.
+ .bindSetEntry(Clock.StartHandler.class, ON_DRIVER_STARTED)
+ .bindSetEntry(Clock.StopHandler.class, ON_DRIVER_STOP)
+
+ // Evaluator handlers
+ .bindSetEntry(ServiceEvaluatorAllocatedHandlers.class, ON_EVALUATOR_ALLOCATED)
+ .bindSetEntry(ServiceEvaluatorCompletedHandlers.class, ON_EVALUATOR_COMPLETED)
+ .bindSetEntry(ServiceEvaluatorFailedHandlers.class, ON_EVALUATOR_FAILED)
+
+ // Task handlers
+ .bindSetEntry(ServiceTaskRunningHandlers.class, ON_TASK_RUNNING)
+ .bindSetEntry(DriverRestartTaskRunningHandlers.class, ON_DRIVER_RESTART_TASK_RUNNING)
+ .bindSetEntry(ServiceTaskFailedHandlers.class, ON_TASK_FAILED)
+ .bindSetEntry(ServiceTaskMessageHandlers.class, ON_TASK_MESSAGE)
+ .bindSetEntry(ServiceTaskCompletedHandlers.class, ON_TASK_COMPLETED)
+ .bindSetEntry(ServiceTaskSuspendedHandlers.class, ON_TASK_SUSPENDED)
+
+ // Context handlers
+ .bindSetEntry(ServiceContextActiveHandlers.class, ON_CONTEXT_ACTIVE)
+ .bindSetEntry(DriverRestartContextActiveHandlers.class, ON_DRIVER_RESTART_CONTEXT_ACTIVE)
+ .bindSetEntry(ServiceContextClosedHandlers.class, ON_CONTEXT_CLOSED)
+ .bindSetEntry(ServiceContextMessageHandlers.class, ON_CONTEXT_MESSAGE)
+ .bindSetEntry(ServiceContextFailedHandlers.class, ON_CONTEXT_FAILED)
+
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/FailedJob.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/FailedJob.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/FailedJob.java
new file mode 100644
index 0000000..06757ce
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/FailedJob.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.common.AbstractFailure;
+import org.apache.reef.util.Optional;
+
+/**
+ * An error message that REEF Client receives when there is a user error in REEF job.
+ */
+@Public
+@ClientSide
+@Provided
+public final class FailedJob extends AbstractFailure {
+ /**
+ * @param id Identifier of the Job that produced the error.
+ * @param message One-line error message.
+ * @param description Long error description.
+ * @param cause Java Exception that caused the error.
+ * @param data byte array that contains serialized version of the error.
+ */
+ public FailedJob(final String id,
+ final String message,
+ final Optional<String> description,
+ final Optional<Throwable> cause,
+ final Optional<byte[]> data) {
+ super(id, message, description, cause, data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/FailedRuntime.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/FailedRuntime.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/FailedRuntime.java
new file mode 100644
index 0000000..8cae76f
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/FailedRuntime.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client;
+
+import org.apache.reef.common.AbstractFailure;
+import org.apache.reef.proto.ReefServiceProtos.RuntimeErrorProto;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Error message that REEF Client gets when there is an error in REEF resourcemanager.
+ */
+public final class FailedRuntime extends AbstractFailure {
+
+ /**
+ * Standard java logger.
+ */
+ private static final Logger LOG = Logger.getLogger(AbstractFailure.class.getName());
+
+ /**
+ * Codec to decode serialized exception from a byte array. It is used in getThrowable().
+ */
+ private static final ObjectSerializableCodec<Exception> CODEC = new ObjectSerializableCodec<>();
+
+ /**
+ * Create a new Failure object out of protobuf data.
+ *
+ * @param error Error message as a protocol buffers object.
+ */
+ public FailedRuntime(final RuntimeErrorProto error) {
+ super(error.getIdentifier(), error.getMessage(), Optional.<String>empty(), Optional.of(getThrowable(error)), Optional.<byte[]>empty());
+ }
+
+ /**
+ * Retrieve Java exception from protobuf object, if possible. Otherwise, return null.
+ * This is a utility method used in the FailedRuntime constructor.
+ *
+ * @param error protobuf error message structure.
+ * @return Java exception or null if exception is missing or cannot be decoded.
+ */
+ private static Throwable getThrowable(final RuntimeErrorProto error) {
+ final byte[] data = getData(error);
+ if (data != null) {
+ try {
+ return CODEC.decode(data);
+ } catch (final Throwable ex) {
+ LOG.log(Level.FINE, "Could not decode exception {0}: {1}", new Object[]{error, ex});
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get binary data for the exception, if it exists. Otherwise, return null.
+ * This is a utility method used in the FailedRuntime constructor and getThrowable() method.
+ *
+ * @param error protobuf error message structure.
+ * @return byte array of the exception or null if exception is missing.
+ */
+ private static byte[] getData(final RuntimeErrorProto error) {
+ return error.hasException() ? error.getException().toByteArray() : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/JobMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/JobMessage.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/JobMessage.java
new file mode 100644
index 0000000..ea929b9
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/JobMessage.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.io.Message;
+import org.apache.reef.io.naming.Identifiable;
+
+/**
+ * A message received by the client from the driver.
+ */
+@Public
+@ClientSide
+@Provided
+public final class JobMessage implements Message, Identifiable {
+
+ private final String id;
+ private final byte[] value;
+
+ /**
+ * @param id the identifier of the sending Job
+ * @param value the message
+ */
+ public JobMessage(final String id, final byte[] value) {
+ this.id = id;
+ this.value = value;
+ }
+
+ /**
+ * Get the message sent by the Job.
+ *
+ * @return the message sent by the Job.
+ */
+ @Override
+ public final byte[] get() {
+ return this.value;
+ }
+
+ /**
+ * Get the Identifier of the sending Job.
+ *
+ * @return the Identifier of the sending Job.
+ */
+ @Override
+ public final String getId() {
+ return this.id;
+ }
+
+ @Override
+ public String toString() {
+ return "JobMessage{" +
+ "id='" + id + '\'' +
+ ", value.length=" + value.length +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/LauncherStatus.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/LauncherStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/LauncherStatus.java
new file mode 100644
index 0000000..b35a008
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/LauncherStatus.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client;
+
+import org.apache.reef.util.Optional;
+
+/**
+ * The status of a reef job spawned using the DriverLauncher class.
+ */
+public final class LauncherStatus {
+
+ public static final LauncherStatus INIT = new LauncherStatus(State.INIT);
+ public static final LauncherStatus RUNNING = new LauncherStatus(State.RUNNING);
+ public static final LauncherStatus COMPLETED = new LauncherStatus(State.COMPLETED);
+ public static final LauncherStatus FORCE_CLOSED = new LauncherStatus(State.FORCE_CLOSED);
+ public static final LauncherStatus FAILED = new LauncherStatus(State.FAILED);
+ private final State state;
+ private final Optional<Throwable> error;
+
+ private LauncherStatus(final State state) {
+ this(state, null);
+ }
+
+
+ private LauncherStatus(final State state, final Throwable ex) {
+ this.state = state;
+ this.error = Optional.ofNullable(ex);
+ }
+
+ public static final LauncherStatus FAILED(final Throwable ex) {
+ return new LauncherStatus(State.FAILED, ex);
+ }
+
+ public static final LauncherStatus FAILED(final Optional<Throwable> ex) {
+ return new LauncherStatus(State.FAILED, ex.orElse(null));
+ }
+
+ public Optional<Throwable> getError() {
+ return this.error;
+ }
+
+ /**
+ * Compare the <b>State</b> of two LauncherStatus objects.
+ * Note that it does NOT compare the exceptions - just the states.
+ *
+ * @return True if both LauncherStatus objects are in the same state.
+ */
+ @Override
+ public boolean equals(final Object other) {
+ return this == other ||
+ (other instanceof LauncherStatus && ((LauncherStatus) other).state == this.state);
+ }
+
+ /**
+ * Has the job completed?
+ *
+ * @return True if the job has been completed, false otherwise.
+ */
+ public final boolean isDone() {
+ switch (this.state) {
+ case FAILED:
+ case COMPLETED:
+ case FORCE_CLOSED:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Has the job completed successfully?
+ *
+ * @return True if the job has been completed successfully, false otherwise.
+ */
+ public final boolean isSuccess() {
+ return this.state == State.COMPLETED;
+ }
+
+ /**
+ * Is the job still running?
+ *
+ * @return True if the job is still running, false otherwise.
+ */
+ public final boolean isRunning() {
+ return this.state == State.RUNNING;
+ }
+
+ @Override
+ public String toString() {
+ if (this.error.isPresent()) {
+ return this.state + "(" + this.error.get() + ")";
+ } else {
+ return this.state.toString();
+ }
+ }
+
+ /**
+ * The state the computation could be in.
+ */
+ private enum State {
+ INIT,
+ RUNNING,
+ COMPLETED,
+ FAILED,
+ FORCE_CLOSED
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/REEF.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/REEF.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/REEF.java
new file mode 100644
index 0000000..5a783f0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/REEF.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.runtime.common.client.REEFImplementation;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+/**
+ * The main entry point into the REEF resourcemanager.
+ * <p/>
+ * Every REEF resourcemanager provides an implementation of this interface. That
+ * instance is used to submitTask the Driver class for execution to REEF. As with
+ * all submissions in REEF, this is done in the form of a TANG Configuration
+ * object.
+ */
+@Public
+@ClientSide
+@DefaultImplementation(REEFImplementation.class)
+public interface REEF extends AutoCloseable {
+
+ /**
+ * Close the resourcemanager connection.
+ */
+ @Override
+ public void close();
+
+ /**
+ * Submits the Driver set up in the given Configuration for execution.
+ * <p/>
+ * The Configuration needs to bind the Driver interface to an actual
+ * implementation of that interface for the job at hand.
+ *
+ * @param driverConf The driver configuration: including everything it needs to execute. @see DriverConfiguration
+ */
+ public void submit(final Configuration driverConf);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/RunningJob.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/RunningJob.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/RunningJob.java
new file mode 100644
index 0000000..f29cb32
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/RunningJob.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.io.naming.Identifiable;
+import org.apache.reef.runtime.common.client.RunningJobImpl;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+/**
+ * Represents a running REEF job.
+ */
+@Public
+@ClientSide
+@Provided
+@DefaultImplementation(RunningJobImpl.class)
+public interface RunningJob extends Identifiable, AutoCloseable {
+
+ /**
+ * Cancels the running Job.
+ */
+ @Override
+ public void close();
+
+ /**
+ * Cancels the running Job.
+ *
+ * @param message delivered along with cancel request.
+ */
+ public void close(final byte[] message);
+
+ /**
+ * @return the ID of the running job.
+ */
+ @Override
+ public String getId();
+
+ /**
+ * Send a message to the Driver.
+ *
+ * @param message to send to the running driver
+ */
+ public void send(final byte[] message);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/package-info.java
new file mode 100644
index 0000000..9962aa0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Client APIs for REEF. A Client in REEF is the program that submits a Driver to a resource manager.
+ * This submission is done via the REEF.submit() method which accepts a Driver Configuration.
+ */
+package org.apache.reef.client;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobCompletedHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobCompletedHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobCompletedHandler.java
new file mode 100644
index 0000000..3367ae0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobCompletedHandler.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client.parameters;
+
+import org.apache.reef.client.CompletedJob;
+import org.apache.reef.runtime.common.client.defaults.DefaultCompletedJobHandler;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Event handler for CompletedJob
+ */
+@NamedParameter(doc = "Event handler for CompletedJob",
+ default_classes = DefaultCompletedJobHandler.class)
+public final class JobCompletedHandler implements Name<EventHandler<CompletedJob>> {
+ private JobCompletedHandler() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobFailedHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobFailedHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobFailedHandler.java
new file mode 100644
index 0000000..7dd186c
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobFailedHandler.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client.parameters;
+
+import org.apache.reef.client.FailedJob;
+import org.apache.reef.runtime.common.client.defaults.DefaultFailedJobHandler;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Client EventHandler triggered on remote job failure.
+ */
+@NamedParameter(doc = "Client EventHandler triggered on remote job failure.",
+ default_classes = DefaultFailedJobHandler.class)
+public final class JobFailedHandler implements Name<EventHandler<FailedJob>> {
+ private JobFailedHandler() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobMessageHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobMessageHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobMessageHandler.java
new file mode 100644
index 0000000..7d0942c
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobMessageHandler.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client.parameters;
+
+import org.apache.reef.client.JobMessage;
+import org.apache.reef.runtime.common.client.defaults.DefaultJobMessageHandler;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Client EventHandler that gets messages from the Driver.
+ */
+@NamedParameter(doc = "Client EventHandler that gets messages from the Driver.",
+ default_classes = DefaultJobMessageHandler.class)
+public final class JobMessageHandler implements Name<EventHandler<JobMessage>> {
+ private JobMessageHandler() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobRunningHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobRunningHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobRunningHandler.java
new file mode 100644
index 0000000..1de835c
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobRunningHandler.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client.parameters;
+
+import org.apache.reef.client.RunningJob;
+import org.apache.reef.runtime.common.client.defaults.DefaultRunningJobHandler;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Client EventHandler triggered when the REEF job is running.
+ */
+@NamedParameter(doc = "Client EventHandler triggered when the REEF job is running.",
+ default_classes = DefaultRunningJobHandler.class)
+public final class JobRunningHandler implements Name<EventHandler<RunningJob>> {
+ private JobRunningHandler() {
+ }
+}