You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:14 UTC

[41/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/Naming.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/Naming.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/Naming.java
new file mode 100644
index 0000000..8779b95
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/Naming.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.naming;
+
+/**
+ * Implementations of this interface facilitate address lookups based on
+ * Identifiers and allow to register and unregister addresses for Identifiers.
+ */
+public interface Naming extends NamingLookup, NamingRegistry {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingLookup.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingLookup.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingLookup.java
new file mode 100644
index 0000000..4fb7d15
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingLookup.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.naming;
+
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.wake.Identifier;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Implementations of this interface facilitate address lookups based on
+ * Identifiers.
+ */
+@Public
+public interface NamingLookup {
+
+  /**
+   * Lookup an Address for a given Identifier.
+   *
+   * @param id
+   * @return
+   * @throws IOException
+   */
+  public InetSocketAddress lookup(final Identifier id) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingRegistry.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingRegistry.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingRegistry.java
new file mode 100644
index 0000000..0d4f6c5
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingRegistry.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.naming;
+
+import org.apache.reef.wake.Identifier;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Allows to register and unregister addresses for Identifiers.
+ */
+public interface NamingRegistry {
+
+  /**
+   * Register the given Address for the given Identifier
+   *
+   * @param id
+   * @param addr
+   * @throws Exception
+   */
+  public void register(final Identifier id, final InetSocketAddress addr) throws Exception;
+
+  /**
+   * Unregister the given Identifier from the registry
+   *
+   * @param id
+   * @throws Exception
+   */
+  public void unregister(final Identifier id) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/package-info.java
new file mode 100644
index 0000000..702f401
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * APIs for I/O in REEF: {@link org.apache.reef.io.Codec}s and {@link org.apache.reef.io.Serializer}s
+ */
+@Unstable package org.apache.reef.io;
+
+import org.apache.reef.annotations.Unstable;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Codec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Codec.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Codec.java
new file mode 100644
index 0000000..5e69d4f
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Codec.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.serialization;
+
+/**
+ * Interface for serialization routines that translate back and forth between
+ * byte arrays with low latency. (Contrast to Serializer, Deserializer, which
+ * optimize for file size and throughput.)
+ *
+ * @param <T> The type of the objects (de-)serialized
+ */
+public interface Codec<T> {
+
+  /**
+   * Encodes the given object into a Byte Array
+   *
+   * @param obj
+   * @return a byte[] representation of the object
+   */
+  public byte[] encode(T obj);
+
+  /**
+   * Decodes the given byte array into an object
+   *
+   * @param buf
+   * @return the decoded object
+   */
+  public T decode(byte[] buf);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Deserializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Deserializer.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Deserializer.java
new file mode 100644
index 0000000..25ffe0a
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Deserializer.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.serialization;
+
+/**
+ * Stream-based multi-object deserialization interface. This class wraps an
+ * instance of InType in an iterator that deserializes its contents, and returns
+ * it as a stream of objects.
+ *
+ * @param <ObjectType>
+ * @param <InType>
+ */
+public interface Deserializer<ObjectType, InType> {
+  /**
+   * Deserialize a stream of input.
+   *
+   * @param arg
+   * @return
+   */
+  public Iterable<ObjectType> create(InType arg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/SerializableCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/SerializableCodec.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/SerializableCodec.java
new file mode 100644
index 0000000..dbfd1dd
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/SerializableCodec.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.serialization;
+
+import javax.inject.Inject;
+import java.io.*;
+import java.util.logging.Logger;
+
+/**
+ * A {@link Codec} for {@link Serializable} objects.
+ * <p/>
+ * It uses java serialization, use with caution.
+ *
+ * @param <T> The type of objects Serialized
+ */
+public class SerializableCodec<T extends Serializable> implements Codec<T> {
+
+  private static final Logger LOG = Logger.getLogger(SerializableCodec.class.getName());
+
+  /**
+   * Default constructor for TANG use.
+   */
+  @Inject
+  public SerializableCodec() {
+  }
+
+  @Override
+  public byte[] encode(final T obj) {
+    try (final ByteArrayOutputStream bout = new ByteArrayOutputStream()) {
+      try (final ObjectOutputStream out = new ObjectOutputStream(bout)) {
+        out.writeObject(obj);
+      }
+      return bout.toByteArray();
+    } catch (final IOException ex) {
+      throw new RuntimeException("Unable to encode: " + obj, ex);
+    }
+  }
+
+  @Override
+  public T decode(final byte[] buf) {
+    try {
+      try (final ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(buf))) {
+        final T result = (T) oin.readObject();
+        return result;
+      }
+    } catch (final IOException | ClassNotFoundException ex) {
+      throw new RuntimeException("Unable to decode.", ex);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Serializer.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Serializer.java
new file mode 100644
index 0000000..08f3c94
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Serializer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.serialization;
+
+import org.apache.reef.io.Accumulable;
+
+/**
+ * Stream-based multi-object serialization interface. Implementations of this
+ * interface should take an OutType as a constructor parameter.
+ *
+ * @param <ObjectType>
+ * @param <OutType>
+ */
+public interface Serializer<ObjectType, OutType> {
+  /**
+   * Serialize a stream of objects.
+   *
+   * @param arg
+   * @return
+   */
+  public Accumulable<ObjectType> create(OutType arg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/DriverRestartCompleted.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/DriverRestartCompleted.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/DriverRestartCompleted.java
new file mode 100644
index 0000000..843c85e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/DriverRestartCompleted.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common;
+
+import org.apache.reef.wake.time.Time;
+
+public final class DriverRestartCompleted extends Time {
+
+  public DriverRestartCompleted(final long timestamp) {
+    super(timestamp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/Launcher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/Launcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/Launcher.java
new file mode 100644
index 0000000..451648b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/Launcher.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common;
+
+import org.apache.reef.runtime.common.launch.LaunchClass;
+import org.apache.reef.runtime.common.launch.REEFErrorHandler;
+import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.runtime.common.launch.parameters.LaunchID;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.util.ThreadLogger;
+import org.apache.reef.util.logging.LoggingSetup;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The main entrance point into any REEF process. It is mostly instantiating LaunchClass and calling .run() on it.
+ */
+public final class Launcher {
+
+  private final static Logger LOG = Logger.getLogger(Launcher.class.getName());
+
+  static {
+    LoggingSetup.setupCommonsLogging();
+  }
+
+  private Launcher() {
+  }
+
+  /**
+   * Parse command line options of the launcher.
+   *
+   * @param args Command line as passed into main().
+   * @return TANG configuration object.
+   */
+  private static Configuration processCommandLine(
+      final String[] args) throws BindException, IOException, InjectionException {
+
+    final JavaConfigurationBuilder commandLineBuilder =
+        Tang.Factory.getTang().newConfigurationBuilder();
+
+    new CommandLine(commandLineBuilder)
+        .registerShortNameOfClass(ClockConfigurationPath.class)
+        .registerShortNameOfClass(ErrorHandlerRID.class)
+        .registerShortNameOfClass(LaunchID.class)
+        .processCommandLine(args);
+
+    return commandLineBuilder
+        // Bind the wake error handler
+        .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
+        .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_LAUNCHER")
+            // Bind the wake codec
+        .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+        .build();
+  }
+
+  private static void fail(final String msg, final Throwable t) {
+    LOG.log(Level.SEVERE, msg, t);
+    throw new RuntimeException(msg, t);
+  }
+
+
+  /**
+   * Launches a REEF client process (Driver or Evaluator).
+   *
+   * @param args
+   * @throws Exception
+   */
+  public static void main(final String[] args) {
+    LOG.log(Level.FINE, "Launcher started with user name [{0}]", System.getProperty("user.name"));
+
+    LOG.log(Level.FINE, "Launcher started. Assertions are {0} in this process.",
+        EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED");
+    Injector injector = null;
+    try {
+      injector = Tang.Factory.getTang().newInjector(processCommandLine(args));
+    } catch (final BindException | IOException | InjectionException e) {
+      fail("Error in parsing the command line", e);
+    }
+
+    try (final LaunchClass lc = injector.getInstance(LaunchClass.class)) {
+      LOG.log(Level.FINE, "Launcher starting");
+      lc.run();
+      LOG.log(Level.FINE, "Launcher exiting");
+    } catch (final Throwable throwable) {
+      fail("Unable to run LaunchClass", throwable);
+    }
+
+    LOG.log(Level.INFO, "Exiting Launcher.main()");
+    if (LOG.isLoggable(Level.FINEST)) {
+      LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after Launcher.close():"));
+    }
+    System.exit(0);
+    if (LOG.isLoggable(Level.FINEST)) {
+      LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after System.exit():"));
+    }
+  }
+
+  /**
+   * Pass values of the properties specified in the propNames array as <code>-D...</code>
+   * command line parameters. Currently used only to pass logging configuration to child JVMs processes.
+   *
+   * @param vargs     List of command line parameters to append to.
+   * @param copyNull  create an empty parameter if the property is missing in current process.
+   * @param propNames property names.
+   */
+  public static void propagateProperties(
+      final Collection<String> vargs, final boolean copyNull, final String... propNames) {
+    for (final String propName : propNames) {
+      final String propValue = System.getProperty(propName);
+      if (propValue == null || propValue.isEmpty()) {
+        if (copyNull) {
+          vargs.add("-D" + propName);
+        }
+      } else {
+        vargs.add(String.format("-D%s=%s", propName, propValue));
+      }
+    }
+  }
+
+  /**
+   * Same as above, but with copyNull == false by default.
+   */
+  public static void propagateProperties(
+      final Collection<String> vargs, final String... propNames) {
+    propagateProperties(vargs, false, propNames);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/ClientWireUp.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/ClientWireUp.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/ClientWireUp.java
new file mode 100644
index 0000000..b6506ca
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/ClientWireUp.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.client.parameters.ClientPresent;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Used on the Client side to setup event handlers and such.
+ */
+@ClientSide
+@Private
+final class ClientWireUp {
+  private static final Logger LOG = Logger.getLogger(ClientWireUp.class.getName());
+  private final RuntimeErrorProtoHandler runtimeErrorProtoHandler;
+  private final JobStatusMessageHandler jobStatusMessageHandler;
+  private final Optional<RemoteManager> remoteManager;
+  private final boolean isClientPresent;
+  private boolean isWired = false;
+
+  @Inject
+  ClientWireUp(final RemoteManager remoteManager,
+               final @Parameter(ClientPresent.class) String clientPresent,
+               final RuntimeErrorProtoHandler runtimeErrorProtoHandler,
+               final JobStatusMessageHandler jobStatusMessageHandler) {
+    this.remoteManager = Optional.ofNullable(remoteManager);
+    this.runtimeErrorProtoHandler = runtimeErrorProtoHandler;
+    this.jobStatusMessageHandler = jobStatusMessageHandler;
+    this.isClientPresent = clientPresent.equals(ClientPresent.YES);
+    LOG.log(Level.FINE, "Instantiated 'ClientWireUp'. Client present: " + this.isClientPresent());
+  }
+
+  @Inject
+  ClientWireUp(final @Parameter(ClientPresent.class) String clientPresent,
+               final RuntimeErrorProtoHandler runtimeErrorProtoHandler,
+               final JobStatusMessageHandler jobStatusMessageHandler) {
+    this(null, clientPresent, runtimeErrorProtoHandler, jobStatusMessageHandler);
+  }
+
+  synchronized void performWireUp() {
+    if (this.isWired) {
+      throw new IllegalStateException("performWireUp is only to be called once.");
+    }
+    if (this.remoteManager.isPresent()) {
+      LOG.log(Level.FINEST, "Wiring up communications channels to the Driver.");
+      final RemoteManager rm = this.remoteManager.get();
+      rm.registerHandler(ReefServiceProtos.RuntimeErrorProto.class, this.runtimeErrorProtoHandler);
+      rm.registerHandler(ReefServiceProtos.JobStatusProto.class, this.jobStatusMessageHandler);
+      LOG.log(Level.FINE, "Wired up communications channels to the Driver.");
+    }
+    this.isWired = true;
+  }
+
+  synchronized boolean isClientPresent() {
+    return this.isClientPresent;
+  }
+
+  synchronized String getRemoteManagerIdentifier() {
+    if (!this.isClientPresent() || !this.remoteManager.isPresent()) {
+      throw new RuntimeException("No need to setup the remote manager.");
+    } else {
+      return this.remoteManager.get().getMyIdentifier();
+    }
+  }
+
+  /**
+   * Closes the remote manager, if there was one.
+   */
+  synchronized void close() {
+    if (this.remoteManager.isPresent()) {
+      try {
+        this.remoteManager.get().close();
+      } catch (final Exception e) {
+        LOG.log(Level.WARNING, "Exception while shutting down the RemoteManager.", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CommonClientConfigurationModule.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CommonClientConfigurationModule.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CommonClientConfigurationModule.java
new file mode 100644
index 0000000..59892e9
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CommonClientConfigurationModule.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+
+
+public class CommonClientConfigurationModule extends ConfigurationModuleBuilder {
+  public final static ConfigurationModule CONF = new CommonClientConfigurationModule()
+      .build();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CompletedJobImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CompletedJobImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CompletedJobImpl.java
new file mode 100644
index 0000000..908ea54
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CompletedJobImpl.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.client.CompletedJob;
+
+/**
+ * An implementation of CompletedJob
+ */
+@ClientSide
+@Private
+final class CompletedJobImpl implements CompletedJob {
+  private final String jobId;
+
+  CompletedJobImpl(final String jobId) {
+    this.jobId = jobId;
+  }
+
+  @Override
+  public String getId() {
+    return this.jobId;
+  }
+
+  @Override
+  public String toString() {
+    return "CompletedJob{'" + jobId + "'}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobStatusMessageHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobStatusMessageHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobStatusMessageHandler.java
new file mode 100644
index 0000000..2ef9706
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobStatusMessageHandler.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A Handler for JobStatus messages from running jobs
+ */
+@ClientSide
+@Private
+final class JobStatusMessageHandler implements EventHandler<RemoteMessage<ReefServiceProtos.JobStatusProto>> {
+  private final Logger LOG = Logger.getLogger(JobStatusMessageHandler.class.getName());
+  private final RunningJobs runningJobs;
+
+  @Inject
+  JobStatusMessageHandler(final RunningJobs runningJobs) {
+    this.runningJobs = runningJobs;
+    LOG.log(Level.FINE, "Instantiated 'JobStatusMessageHandler'");
+  }
+
+  @Override
+  public void onNext(RemoteMessage<ReefServiceProtos.JobStatusProto> jobStatusProtoRemoteMessage) {
+    this.runningJobs.onJobStatusMessage(jobStatusProtoRemoteMessage);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
new file mode 100644
index 0000000..6b89c09
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.JARFileMaker;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Shared code between job submission with and without client
+ */
+final class JobSubmissionHelper {
+
+  static {
+    System.out.println(
+        "\nPowered by\n" +
+            "     ___________  ______  ______  _______\n" +
+            "    /  ______  / /  ___/ /  ___/ /  ____/\n" +
+            "   /     _____/ /  /__  /  /__  /  /___\n" +
+            "  /  /\\  \\     /  ___/ /  ___/ /  ____/\n" +
+            " /  /  \\  \\   /  /__  /  /__  /  /\n" +
+            "/__/    \\__\\ /_____/ /_____/ /__/\n"
+    );
+  }
+
+  private static final Logger LOG = Logger.getLogger(JobSubmissionHelper.class.getName());
+
+  private final ConfigurationSerializer configurationSerializer;
+
+  @Inject
+  JobSubmissionHelper(final ConfigurationSerializer configurationSerializer) {
+    this.configurationSerializer = configurationSerializer;
+  }
+
+  /**
+   * Fils out a JobSubmissionProto based on the driver configuration given.
+   *
+   * @param driverConfiguration
+   * @return
+   * @throws InjectionException
+   * @throws IOException
+   */
+  final ClientRuntimeProtocol.JobSubmissionProto.Builder getJobsubmissionProto(final Configuration driverConfiguration) throws InjectionException, IOException {
+    final Injector injector = Tang.Factory.getTang().newInjector(driverConfiguration);
+
+    final ClientRuntimeProtocol.JobSubmissionProto.Builder jbuilder = ClientRuntimeProtocol.JobSubmissionProto.newBuilder()
+        .setIdentifier(returnOrGenerateDriverId(injector.getNamedInstance(DriverIdentifier.class)))
+        .setDriverMemory(injector.getNamedInstance(DriverMemory.class))
+        .setUserName(System.getProperty("user.name"))
+        .setConfiguration(configurationSerializer.toString(driverConfiguration));
+
+
+    for (final String globalFileName : injector.getNamedInstance(JobGlobalFiles.class)) {
+      LOG.log(Level.FINEST, "Adding global file: {0}", globalFileName);
+      jbuilder.addGlobalFile(getFileResourceProto(globalFileName, ReefServiceProtos.FileType.PLAIN));
+    }
+
+    for (final String globalLibraryName : injector.getNamedInstance(JobGlobalLibraries.class)) {
+      LOG.log(Level.FINEST, "Adding global library: {0}", globalLibraryName);
+      jbuilder.addGlobalFile(getFileResourceProto(globalLibraryName, ReefServiceProtos.FileType.LIB));
+    }
+
+    for (final String localFileName : injector.getNamedInstance(DriverLocalFiles.class)) {
+      LOG.log(Level.FINEST, "Adding local file: {0}", localFileName);
+      jbuilder.addLocalFile(getFileResourceProto(localFileName, ReefServiceProtos.FileType.PLAIN));
+    }
+
+    for (final String localLibraryName : injector.getNamedInstance(DriverLocalLibraries.class)) {
+      LOG.log(Level.FINEST, "Adding local library: {0}", localLibraryName);
+      jbuilder.addLocalFile(getFileResourceProto(localLibraryName, ReefServiceProtos.FileType.LIB));
+    }
+
+    return jbuilder;
+  }
+
+
+  /**
+   * @param configuredId
+   * @return the given driver ID (if it is not the default) or generates a new unique one if it is.
+   */
+  private static String returnOrGenerateDriverId(final String configuredId) {
+    final String result;
+    if (configuredId.equals(DriverIdentifier.DEFAULT_VALUE)) {
+      // Generate a uniqe driver ID
+      LOG.log(Level.FINE, "No Job Identifier given. Generating a unique one.");
+      result = "REEF-" + System.getProperty("user.name", "UNKNOWN_USER") + "-" + System.currentTimeMillis();
+    } else {
+      result = configuredId;
+    }
+    return result;
+  }
+
+
+  /**
+   * Turns a pathname into the right protocol for job submission.
+   *
+   * @param fileName
+   * @param type
+   * @return
+   * @throws IOException
+   */
+  private static ReefServiceProtos.FileResourceProto getFileResourceProto(final String fileName, final ReefServiceProtos.FileType type) throws IOException {
+    File file = new File(fileName);
+    if (file.exists()) {
+      // It is a local file and can be added.
+      if (file.isDirectory()) {
+        // If it is a directory, create a JAR file of it and add that instead.
+        file = toJar(file);
+      }
+      return ReefServiceProtos.FileResourceProto.newBuilder()
+          .setName(file.getName())
+          .setPath(file.getPath())
+          .setType(type)
+          .build();
+    } else {
+      // The file isn't in the local filesytem. Assume that the file is actually a URI.
+      // We then assume that the underlying resource manager knows how to deal with it.
+      try {
+        final URI uri = new URI(fileName);
+        final String path = uri.getPath();
+        final String name = path.substring(path.lastIndexOf('/') + 1);
+        return ReefServiceProtos.FileResourceProto.newBuilder()
+            .setName(name)
+            .setPath(uri.toString())
+            .setType(type)
+            .build();
+      } catch (final URISyntaxException e) {
+        throw new IOException("Unable to parse URI.", e);
+      }
+    }
+  }
+
+  /**
+   * Turns temporary folder "foo" into a jar file "foo.jar"
+   *
+   * @param file
+   * @return
+   * @throws IOException
+   */
+  private static File toJar(final File file) throws IOException {
+    final File tempFolder = Files.createTempDirectory("reef-tmp-tempFolder").toFile();
+    final File jarFile = File.createTempFile(file.getCanonicalFile().getName(), ".jar", tempFolder);
+    LOG.log(Level.FINEST, "Adding contents of folder {0} to {1}", new Object[]{file, jarFile});
+    try (final JARFileMaker jarMaker = new JARFileMaker(jarFile)) {
+      jarMaker.addChildren(file);
+    }
+    return jarFile;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
new file mode 100644
index 0000000..f0074f9
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.client.REEF;
+import org.apache.reef.proto.ClientRuntimeProtocol.JobSubmissionProto;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.util.REEFVersion;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@ClientSide
+@Provided
+@Private
+public final class REEFImplementation implements REEF {
+
+  private final static Logger LOG = Logger.getLogger(REEFImplementation.class.getName());
+
+  private final JobSubmissionHandler jobSubmissionHandler;
+  private final RunningJobs runningJobs;
+  private final JobSubmissionHelper jobSubmissionHelper;
+  private final ClientWireUp clientWireUp;
+  private final LoggingScopeFactory loggingScopeFactory;
+
+  /**
+   * @param jobSubmissionHandler
+   * @param runningJobs
+   * @param jobSubmissionHelper
+   * @param jobStatusMessageHandler is passed only to make sure it is instantiated
+   * @param clientWireUp
+   * @param reefVersion             provides the current version of REEF.
+   */
+  @Inject
+  REEFImplementation(final JobSubmissionHandler jobSubmissionHandler,
+                     final RunningJobs runningJobs,
+                     final JobSubmissionHelper jobSubmissionHelper,
+                     final JobStatusMessageHandler jobStatusMessageHandler,
+                     final ClientWireUp clientWireUp,
+                     final LoggingScopeFactory loggingScopeFactory,
+                     final REEFVersion reefVersion) {
+    this.jobSubmissionHandler = jobSubmissionHandler;
+    this.runningJobs = runningJobs;
+    this.jobSubmissionHelper = jobSubmissionHelper;
+    this.clientWireUp = clientWireUp;
+    clientWireUp.performWireUp();
+    this.loggingScopeFactory = loggingScopeFactory;
+    reefVersion.logVersion();
+  }
+
+  @Override
+  public final void close() {
+    this.runningJobs.closeAllJobs();
+    this.clientWireUp.close();
+  }
+
+  @Override
+  public void submit(final Configuration driverConf) {
+    try (LoggingScope ls = this.loggingScopeFactory.reefSubmit()) {
+      final JobSubmissionProto submissionMessage;
+      try {
+        if (this.clientWireUp.isClientPresent()) {
+          submissionMessage = this.jobSubmissionHelper.getJobsubmissionProto(driverConf)
+              .setRemoteId(this.clientWireUp.getRemoteManagerIdentifier())
+              .build();
+        } else {
+          submissionMessage = this.jobSubmissionHelper.getJobsubmissionProto(driverConf)
+              .setRemoteId(ErrorHandlerRID.NONE)
+              .build();
+        }
+      } catch (final Exception e) {
+        throw new RuntimeException("Exception while processing driver configuration.", e);
+      }
+
+      this.jobSubmissionHandler.onNext(submissionMessage);
+    }
+  }
+
+  @NamedParameter(doc = "The driver remote identifier.")
+  public final static class DriverRemoteIdentifier implements Name<String> {
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobImpl.java
new file mode 100644
index 0000000..c41597a
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobImpl.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.client.CompletedJob;
+import org.apache.reef.client.FailedJob;
+import org.apache.reef.client.JobMessage;
+import org.apache.reef.client.RunningJob;
+import org.apache.reef.client.parameters.JobCompletedHandler;
+import org.apache.reef.client.parameters.JobFailedHandler;
+import org.apache.reef.client.parameters.JobMessageHandler;
+import org.apache.reef.client.parameters.JobRunningHandler;
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.proto.ClientRuntimeProtocol.JobControlProto;
+import org.apache.reef.proto.ClientRuntimeProtocol.Signal;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.proto.ReefServiceProtos.JobStatusProto;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Implementation of RunningJob.
+ */
+@ClientSide
+@Private
+public final class RunningJobImpl implements RunningJob, EventHandler<JobStatusProto> {
+
+  private static final Logger LOG = Logger.getLogger(RunningJob.class.getName());
+
+  private final String jobId;
+
+  private final EventHandler<JobControlProto> jobControlHandler;
+  private final EventHandler<RunningJob> runningJobEventHandler;
+  private final EventHandler<CompletedJob> completedJobEventHandler;
+  private final EventHandler<FailedJob> failedJobEventHandler;
+  private final EventHandler<JobMessage> jobMessageEventHandler;
+  private final ExceptionCodec exceptionCodec;
+
+  @Inject
+  RunningJobImpl(final RemoteManager remoteManager,
+                 final @Parameter(DriverIdentifier.class) String driverIdentifier,
+                 final @Parameter(REEFImplementation.DriverRemoteIdentifier.class) String driverRID,
+                 final @Parameter(JobRunningHandler.class) EventHandler<RunningJob> runningJobEventHandler,
+                 final @Parameter(JobCompletedHandler.class) EventHandler<CompletedJob> completedJobEventHandler,
+                 final @Parameter(JobFailedHandler.class) EventHandler<FailedJob> failedJobEventHandler,
+                 final @Parameter(JobMessageHandler.class) EventHandler<JobMessage> jobMessageEventHandler,
+                 final ExceptionCodec exceptionCodec) {
+
+    this.jobId = driverIdentifier;
+    this.runningJobEventHandler = runningJobEventHandler;
+    this.completedJobEventHandler = completedJobEventHandler;
+    this.failedJobEventHandler = failedJobEventHandler;
+    this.jobMessageEventHandler = jobMessageEventHandler;
+    this.exceptionCodec = exceptionCodec;
+    this.jobControlHandler = remoteManager.getHandler(driverRID, JobControlProto.class);
+
+    this.runningJobEventHandler.onNext(this);
+    LOG.log(Level.FINE, "Instantiated 'RunningJobImpl'");
+  }
+
+  @Override
+  public synchronized void close() {
+    this.jobControlHandler.onNext(
+        JobControlProto.newBuilder()
+            .setIdentifier(this.jobId)
+            .setSignal(Signal.SIG_TERMINATE)
+            .build()
+    );
+  }
+
+  @Override
+  public synchronized void close(final byte[] message) {
+    this.jobControlHandler.onNext(
+        JobControlProto.newBuilder()
+            .setIdentifier(this.jobId)
+            .setSignal(Signal.SIG_TERMINATE)
+            .setMessage(ByteString.copyFrom(message))
+            .build()
+    );
+  }
+
+  @Override
+  public String getId() {
+    return this.jobId;
+  }
+
+  @Override
+  public synchronized void send(final byte[] message) {
+    this.jobControlHandler.onNext(
+        JobControlProto.newBuilder()
+            .setIdentifier(this.jobId)
+            .setMessage(ByteString.copyFrom(message))
+            .build()
+    );
+  }
+
+  @Override
+  public synchronized void onNext(final JobStatusProto value) {
+
+    final ReefServiceProtos.State state = value.getState();
+    LOG.log(Level.FINEST, "Received job status: {0} from {1}",
+        new Object[]{state, value.getIdentifier()});
+
+    if (value.hasMessage()) {
+      this.jobMessageEventHandler.onNext(
+          new JobMessage(getId(), value.getMessage().toByteArray()));
+    }
+    if (state == ReefServiceProtos.State.DONE) {
+      this.completedJobEventHandler.onNext(new CompletedJobImpl(this.getId()));
+    } else if (state == ReefServiceProtos.State.FAILED) {
+      this.onJobFailure(value);
+    }
+  }
+
+  /**
+   * Inform the client of a failed job.
+   *
+   * @param jobStatusProto
+   */
+  private synchronized void onJobFailure(final JobStatusProto jobStatusProto) {
+    assert (jobStatusProto.getState() == ReefServiceProtos.State.FAILED);
+
+    final String id = this.jobId;
+    final Optional<byte[]> data = jobStatusProto.hasException() ?
+        Optional.of(jobStatusProto.getException().toByteArray()) :
+        Optional.<byte[]>empty();
+    final Optional<Throwable> cause = this.exceptionCodec.fromBytes(data);
+
+    final String message = cause.isPresent() ?
+        cause.get().getMessage() :
+        "No Message sent by the Job";
+    final Optional<String> description = Optional.of(message);
+
+    final FailedJob failedJob = new FailedJob(id, message, description, cause, data);
+    this.failedJobEventHandler.onNext(failedJob);
+  }
+
+  @Override
+  public String toString() {
+    return "RunningJob{'" + this.jobId + "'}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobs.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobs.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobs.java
new file mode 100644
index 0000000..7c9bbb7
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobs.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+/**
+ * Manages the RunningJobs a client knows about
+ */
+@Private
+@ClientSide
+@DefaultImplementation(RunningJobsImpl.class)
+interface RunningJobs {
+
+  /**
+   * Closes all registered jobs forcefully.
+   */
+  public void closeAllJobs();
+
+  /**
+   * Processes a status message from a Job. If the Job is already known, the message will be passed on. If it is a
+   * first message, a new RunningJob instance will be created for it.
+   *
+   * @param message
+   */
+  public void onJobStatusMessage(final RemoteMessage<ReefServiceProtos.JobStatusProto> message);
+
+  /**
+   * Processes a error message from the resource manager.
+   *
+   * @param runtimeFailure
+   */
+  public void onRuntimeErrorMessage(final RemoteMessage<ReefServiceProtos.RuntimeErrorProto> runtimeFailure);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobsImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobsImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobsImpl.java
new file mode 100644
index 0000000..7a9ed05
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobsImpl.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.client.FailedRuntime;
+import org.apache.reef.client.parameters.ResourceManagerErrorHandler;
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@ClientSide
+@Private
+final class RunningJobsImpl implements RunningJobs {
+  private static final Logger LOG = Logger.getLogger(RunningJobsImpl.class.getName());
+  private final Map<String, RunningJobImpl> jobs = new HashMap<>();
+  private final Injector injector;
+  private final InjectionFuture<EventHandler<FailedRuntime>> failedRuntimeEventHandler;
+
+  @Inject
+  RunningJobsImpl(final Injector injector,
+                  final @Parameter(ResourceManagerErrorHandler.class) InjectionFuture<EventHandler<FailedRuntime>> failedRuntimeEventHandler) {
+    this.injector = injector;
+    this.failedRuntimeEventHandler = failedRuntimeEventHandler;
+    LOG.log(Level.FINE, "Instantiated 'RunningJobImpl'");
+  }
+
+
+  @Override
+  public synchronized void closeAllJobs() {
+    for (final RunningJobImpl runningJob : this.jobs.values()) {
+      LOG.log(Level.WARNING, "Force close job {0}", runningJob.getId());
+      runningJob.close();
+    }
+  }
+
+  @Override
+  public synchronized void onJobStatusMessage(final RemoteMessage<ReefServiceProtos.JobStatusProto> message) {
+    final ReefServiceProtos.JobStatusProto status = message.getMessage();
+    final String jobIdentifier = status.getIdentifier();
+    LOG.log(Level.FINE, "Processing message from Job: " + jobIdentifier);
+
+    if (status.getState() == ReefServiceProtos.State.INIT) {
+      try {
+        final RunningJobImpl runningJob = this.newRunningJob(status.getIdentifier(), message.getIdentifier().toString());
+        this.put(runningJob);
+      } catch (final BindException | InjectionException configError) {
+        throw new RuntimeException("Configuration error for: " + status, configError);
+      }
+    }
+
+    this.get(jobIdentifier).onNext(status);
+    if ((status.getState() != ReefServiceProtos.State.RUNNING) &&
+        (status.getState() != ReefServiceProtos.State.INIT)) {
+      this.remove(status.getIdentifier());
+    }
+    LOG.log(Level.FINE, "Done processing message from Job " + jobIdentifier);
+  }
+
+  @Override
+  public synchronized void onRuntimeErrorMessage(RemoteMessage<ReefServiceProtos.RuntimeErrorProto> runtimeFailure) {
+    try {
+      this.remove(runtimeFailure.getMessage().getIdentifier());
+    } finally {
+      this.failedRuntimeEventHandler.get().onNext(new FailedRuntime(runtimeFailure.getMessage()));
+    }
+  }
+
+
+  /**
+   * A guarded get() that throws an exception if the RunningJob isn't known
+   *
+   * @param jobIdentifier
+   * @return
+   */
+  private synchronized RunningJobImpl get(final String jobIdentifier) {
+    final RunningJobImpl result = this.jobs.get(jobIdentifier);
+    if (null == result) {
+      throw new RuntimeException("Trying to get a RunningJob that is unknown: " + jobIdentifier);
+    }
+    return result;
+  }
+
+  /**
+   * A guarded remove() that throws an exception if no RunningJob is known for this id.
+   *
+   * @param jobIdentifier
+   */
+  private synchronized void remove(final String jobIdentifier) {
+    final RunningJobImpl result = this.jobs.remove(jobIdentifier);
+    if (null == result) {
+      throw new RuntimeException("Trying to remove a RunningJob that is unknown: " + jobIdentifier);
+    }
+  }
+
+
+  private synchronized void put(final RunningJobImpl runningJob) {
+    final String jobIdentifier = runningJob.getId();
+    if (this.jobs.containsKey(jobIdentifier)) {
+      throw new IllegalStateException("Trying to re-add a job that is already known: " + jobIdentifier);
+    }
+    LOG.log(Level.FINE, "Adding Job with ID: " + jobIdentifier);
+    this.jobs.put(jobIdentifier, runningJob);
+  }
+
+  /**
+   * @param jobIdentifier
+   * @param remoteIdentifier
+   * @return
+   * @throws BindException
+   * @throws InjectionException
+   */
+  private synchronized RunningJobImpl newRunningJob(final String jobIdentifier, final String remoteIdentifier) throws BindException, InjectionException {
+    final Injector child = this.injector.forkInjector();
+    child.bindVolatileParameter(REEFImplementation.DriverRemoteIdentifier.class, remoteIdentifier);
+    child.bindVolatileParameter(DriverIdentifier.class, jobIdentifier);
+    return child.getInstance(RunningJobImpl.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RuntimeErrorProtoHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RuntimeErrorProtoHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RuntimeErrorProtoHandler.java
new file mode 100644
index 0000000..71d7895
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RuntimeErrorProtoHandler.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Used in REEFImplementation.
+ */
+final class RuntimeErrorProtoHandler implements EventHandler<RemoteMessage<ReefServiceProtos.RuntimeErrorProto>> {
+
+  private final static Logger LOG = Logger.getLogger(RuntimeErrorProtoHandler.class.getName());
+
+  private final InjectionFuture<RunningJobs> runningJobs;
+
+  @Inject
+  RuntimeErrorProtoHandler(final InjectionFuture<RunningJobs> runningJobs) {
+    this.runningJobs = runningJobs;
+  }
+
+
+  @Override
+  public void onNext(final RemoteMessage<ReefServiceProtos.RuntimeErrorProto> error) {
+    LOG.log(Level.WARNING, "{0} Runtime Error: {1}", new Object[]{
+        error.getIdentifier(), error.getMessage().getMessage()});
+    this.runningJobs.get().onRuntimeErrorMessage(error);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/ClientRuntimeParameters.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/ClientRuntimeParameters.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/ClientRuntimeParameters.java
new file mode 100644
index 0000000..cdc418b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/ClientRuntimeParameters.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.api;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+public final class ClientRuntimeParameters {
+
+  @NamedParameter(doc = "The runtime error handler RID.")
+  public final static class RuntimeErrorHandlerRID implements Name<String> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java
new file mode 100644
index 0000000..af2d064
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.api;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.ClientRuntimeProtocol.JobSubmissionProto;
+import org.apache.reef.wake.EventHandler;
+
+@RuntimeAuthor
+public interface JobSubmissionHandler extends EventHandler<JobSubmissionProto>, AutoCloseable {
+
+  @Override
+  public void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/package-info.java
new file mode 100644
index 0000000..7480e61
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Client-Side Event Handlers to be implemented by a specific resourcemanager
+ */
+package org.apache.reef.runtime.common.client.api;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultCompletedJobHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultCompletedJobHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultCompletedJobHandler.java
new file mode 100644
index 0000000..2f4d7d4
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultCompletedJobHandler.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.defaults;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.CompletedJob;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for CompletedJob: Logging it.
+ */
+@Provided
+@ClientSide
+public final class DefaultCompletedJobHandler implements EventHandler<CompletedJob> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultCompletedJobHandler.class.getName());
+
+  @Inject
+  private DefaultCompletedJobHandler() {
+  }
+
+  @Override
+  public void onNext(final CompletedJob job) {
+    LOG.log(Level.INFO, "Job Completed: {0}", job);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultFailedJobHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultFailedJobHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultFailedJobHandler.java
new file mode 100644
index 0000000..076a316
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultFailedJobHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.defaults;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.FailedJob;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Default event handler for FailedJob: rethrow the exception.
+ */
+@Provided
+@ClientSide
+public final class DefaultFailedJobHandler implements EventHandler<FailedJob> {
+
+  @Inject
+  private DefaultFailedJobHandler() {
+  }
+
+  @Override
+  public void onNext(final FailedJob job) {
+    throw new RuntimeException("REEF job failed: " + job.getId(), job.getReason().orElse(null));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultJobMessageHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultJobMessageHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultJobMessageHandler.java
new file mode 100644
index 0000000..59a6115
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultJobMessageHandler.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.defaults;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.JobMessage;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+/**
+ * Default event handler for job message: Logging it.
+ */
+@Provided
+@ClientSide
+public final class DefaultJobMessageHandler implements EventHandler<JobMessage> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultJobMessageHandler.class.getName());
+
+  @Inject
+  private DefaultJobMessageHandler() {
+  }
+
+  @Override
+  public void onNext(final JobMessage message) {
+    LOG.log(Level.FINE, "Received message: {0}", message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRunningJobHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRunningJobHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRunningJobHandler.java
new file mode 100644
index 0000000..4b94c53
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRunningJobHandler.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.defaults;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.RunningJob;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for RunningJob: Logging it.
+ */
+@Provided
+@ClientSide
+public final class DefaultRunningJobHandler implements EventHandler<RunningJob> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultRunningJobHandler.class.getName());
+
+  @Inject
+  private DefaultRunningJobHandler() {
+  }
+
+  @Override
+  public void onNext(final RunningJob job) {
+    LOG.log(Level.INFO, "Job is running: {0}", job);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRuntimeErrorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRuntimeErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRuntimeErrorHandler.java
new file mode 100644
index 0000000..e72706b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRuntimeErrorHandler.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.defaults;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.FailedRuntime;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for REEF FailedRuntime: rethrow the exception.
+ */
+@Provided
+@ClientSide
+public final class DefaultRuntimeErrorHandler implements EventHandler<FailedRuntime> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultRuntimeErrorHandler.class.getName());
+
+  @Inject
+  private DefaultRuntimeErrorHandler() {
+  }
+
+  @Override
+  public void onNext(final FailedRuntime error) {
+    if (error.getReason().isPresent()) {
+      LOG.log(Level.SEVERE, "Runtime error: " + error, error.getReason().get());
+    } else {
+      LOG.log(Level.SEVERE, "Runtime error: " + error);
+    }
+    throw new RuntimeException("REEF runtime error: " + error, error.asError());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/package-info.java
new file mode 100644
index 0000000..6263ab1
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Default implementations for the optional client-side event handlers.
+ */
+package org.apache.reef.runtime.common.client.defaults;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/package-info.java
new file mode 100644
index 0000000..9af9dfe
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of the client-side REEF API.
+ */
+package org.apache.reef.runtime.common.client;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/parameters/ClientPresent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/parameters/ClientPresent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/parameters/ClientPresent.java
new file mode 100644
index 0000000..0d0e361
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/parameters/ClientPresent.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.parameters;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * A Tang named parameter that indicates whether or not a client (handler for the various Job events) is present.
+ */
+@ClientSide
+@NamedParameter(doc = "Indicates whether or not a client is present", default_value = ClientPresent.NO)
+public final class ClientPresent implements Name<String> {
+  public static final String YES = "YES";
+  public static final String NO = "NO";
+}