You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:14 UTC
[41/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/Naming.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/Naming.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/Naming.java
new file mode 100644
index 0000000..8779b95
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/Naming.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.naming;
+
+/**
+ * Implementations of this interface facilitate address lookups based on
+ * Identifiers and allow to register and unregister addresses for Identifiers.
+ */
+public interface Naming extends NamingLookup, NamingRegistry {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingLookup.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingLookup.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingLookup.java
new file mode 100644
index 0000000..4fb7d15
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingLookup.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.naming;
+
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.wake.Identifier;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Implementations of this interface facilitate address lookups based on
+ * Identifiers.
+ */
+@Public
+public interface NamingLookup {
+
+ /**
+ * Lookup an Address for a given Identifier.
+ *
+ * @param id
+ * @return
+ * @throws IOException
+ */
+ public InetSocketAddress lookup(final Identifier id) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingRegistry.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingRegistry.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingRegistry.java
new file mode 100644
index 0000000..0d4f6c5
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/naming/NamingRegistry.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.naming;
+
+import org.apache.reef.wake.Identifier;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Allows to register and unregister addresses for Identifiers.
+ */
+public interface NamingRegistry {
+
+ /**
+ * Register the given Address for the given Identifier
+ *
+ * @param id
+ * @param addr
+ * @throws Exception
+ */
+ public void register(final Identifier id, final InetSocketAddress addr) throws Exception;
+
+ /**
+ * Unregister the given Identifier from the registry
+ *
+ * @param id
+ * @throws Exception
+ */
+ public void unregister(final Identifier id) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/package-info.java
new file mode 100644
index 0000000..702f401
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * APIs for I/O in REEF: {@link org.apache.reef.io.Codec}s and {@link org.apache.reef.io.Serializer}s
+ */
+@Unstable package org.apache.reef.io;
+
+import org.apache.reef.annotations.Unstable;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Codec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Codec.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Codec.java
new file mode 100644
index 0000000..5e69d4f
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Codec.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.serialization;
+
+/**
+ * Interface for serialization routines that translate back and forth between
+ * byte arrays with low latency. (Contrast to Serializer, Deserializer, which
+ * optimize for file size and throughput.)
+ *
+ * @param <T> The type of the objects (de-)serialized
+ */
+public interface Codec<T> {
+
+ /**
+ * Encodes the given object into a Byte Array
+ *
+ * @param obj
+ * @return a byte[] representation of the object
+ */
+ public byte[] encode(T obj);
+
+ /**
+ * Decodes the given byte array into an object
+ *
+ * @param buf
+ * @return the decoded object
+ */
+ public T decode(byte[] buf);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Deserializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Deserializer.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Deserializer.java
new file mode 100644
index 0000000..25ffe0a
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Deserializer.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.serialization;
+
+/**
+ * Stream-based multi-object deserialization interface. This class wraps an
+ * instance of InType in an iterator that deserializes its contents, and returns
+ * it as a stream of objects.
+ *
+ * @param <ObjectType>
+ * @param <InType>
+ */
+public interface Deserializer<ObjectType, InType> {
+ /**
+ * Deserialize a stream of input.
+ *
+ * @param arg
+ * @return
+ */
+ public Iterable<ObjectType> create(InType arg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/SerializableCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/SerializableCodec.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/SerializableCodec.java
new file mode 100644
index 0000000..dbfd1dd
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/SerializableCodec.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.serialization;
+
+import javax.inject.Inject;
+import java.io.*;
+import java.util.logging.Logger;
+
+/**
+ * A {@link Codec} for {@link Serializable} objects.
+ * <p/>
+ * It uses java serialization, use with caution.
+ *
+ * @param <T> The type of objects Serialized
+ */
+public class SerializableCodec<T extends Serializable> implements Codec<T> {
+
+ private static final Logger LOG = Logger.getLogger(SerializableCodec.class.getName());
+
+ /**
+ * Default constructor for TANG use.
+ */
+ @Inject
+ public SerializableCodec() {
+ }
+
+ @Override
+ public byte[] encode(final T obj) {
+ try (final ByteArrayOutputStream bout = new ByteArrayOutputStream()) {
+ try (final ObjectOutputStream out = new ObjectOutputStream(bout)) {
+ out.writeObject(obj);
+ }
+ return bout.toByteArray();
+ } catch (final IOException ex) {
+ throw new RuntimeException("Unable to encode: " + obj, ex);
+ }
+ }
+
+ @Override
+ public T decode(final byte[] buf) {
+ try {
+ try (final ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(buf))) {
+ final T result = (T) oin.readObject();
+ return result;
+ }
+ } catch (final IOException | ClassNotFoundException ex) {
+ throw new RuntimeException("Unable to decode.", ex);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Serializer.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Serializer.java
new file mode 100644
index 0000000..08f3c94
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/serialization/Serializer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.serialization;
+
+import org.apache.reef.io.Accumulable;
+
+/**
+ * Stream-based multi-object serialization interface. Implementations of this
+ * interface should take an OutType as a constructor parameter.
+ *
+ * @param <ObjectType>
+ * @param <OutType>
+ */
+public interface Serializer<ObjectType, OutType> {
+ /**
+ * Serialize a stream of objects.
+ *
+ * @param arg
+ * @return
+ */
+ public Accumulable<ObjectType> create(OutType arg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/DriverRestartCompleted.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/DriverRestartCompleted.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/DriverRestartCompleted.java
new file mode 100644
index 0000000..843c85e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/DriverRestartCompleted.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common;
+
+import org.apache.reef.wake.time.Time;
+
+public final class DriverRestartCompleted extends Time {
+
+ public DriverRestartCompleted(final long timestamp) {
+ super(timestamp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/Launcher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/Launcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/Launcher.java
new file mode 100644
index 0000000..451648b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/Launcher.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common;
+
+import org.apache.reef.runtime.common.launch.LaunchClass;
+import org.apache.reef.runtime.common.launch.REEFErrorHandler;
+import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.runtime.common.launch.parameters.LaunchID;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.util.ThreadLogger;
+import org.apache.reef.util.logging.LoggingSetup;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The main entrance point into any REEF process. It is mostly instantiating LaunchClass and calling .run() on it.
+ */
+public final class Launcher {
+
+ private final static Logger LOG = Logger.getLogger(Launcher.class.getName());
+
+ static {
+ LoggingSetup.setupCommonsLogging();
+ }
+
+ private Launcher() {
+ }
+
+ /**
+ * Parse command line options of the launcher.
+ *
+ * @param args Command line as passed into main().
+ * @return TANG configuration object.
+ */
+ private static Configuration processCommandLine(
+ final String[] args) throws BindException, IOException, InjectionException {
+
+ final JavaConfigurationBuilder commandLineBuilder =
+ Tang.Factory.getTang().newConfigurationBuilder();
+
+ new CommandLine(commandLineBuilder)
+ .registerShortNameOfClass(ClockConfigurationPath.class)
+ .registerShortNameOfClass(ErrorHandlerRID.class)
+ .registerShortNameOfClass(LaunchID.class)
+ .processCommandLine(args);
+
+ return commandLineBuilder
+ // Bind the wake error handler
+ .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
+ .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_LAUNCHER")
+ // Bind the wake codec
+ .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+ .build();
+ }
+
+ private static void fail(final String msg, final Throwable t) {
+ LOG.log(Level.SEVERE, msg, t);
+ throw new RuntimeException(msg, t);
+ }
+
+
+ /**
+ * Launches a REEF client process (Driver or Evaluator).
+ *
+ * @param args
+ * @throws Exception
+ */
+ public static void main(final String[] args) {
+ LOG.log(Level.FINE, "Launcher started with user name [{0}]", System.getProperty("user.name"));
+
+ LOG.log(Level.FINE, "Launcher started. Assertions are {0} in this process.",
+ EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED");
+ Injector injector = null;
+ try {
+ injector = Tang.Factory.getTang().newInjector(processCommandLine(args));
+ } catch (final BindException | IOException | InjectionException e) {
+ fail("Error in parsing the command line", e);
+ }
+
+ try (final LaunchClass lc = injector.getInstance(LaunchClass.class)) {
+ LOG.log(Level.FINE, "Launcher starting");
+ lc.run();
+ LOG.log(Level.FINE, "Launcher exiting");
+ } catch (final Throwable throwable) {
+ fail("Unable to run LaunchClass", throwable);
+ }
+
+ LOG.log(Level.INFO, "Exiting Launcher.main()");
+ if (LOG.isLoggable(Level.FINEST)) {
+ LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after Launcher.close():"));
+ }
+ System.exit(0);
+ if (LOG.isLoggable(Level.FINEST)) {
+ LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after System.exit():"));
+ }
+ }
+
+ /**
+ * Pass values of the properties specified in the propNames array as <code>-D...</code>
+ * command line parameters. Currently used only to pass logging configuration to child JVMs processes.
+ *
+ * @param vargs List of command line parameters to append to.
+ * @param copyNull create an empty parameter if the property is missing in current process.
+ * @param propNames property names.
+ */
+ public static void propagateProperties(
+ final Collection<String> vargs, final boolean copyNull, final String... propNames) {
+ for (final String propName : propNames) {
+ final String propValue = System.getProperty(propName);
+ if (propValue == null || propValue.isEmpty()) {
+ if (copyNull) {
+ vargs.add("-D" + propName);
+ }
+ } else {
+ vargs.add(String.format("-D%s=%s", propName, propValue));
+ }
+ }
+ }
+
+ /**
+ * Same as above, but with copyNull == false by default.
+ */
+ public static void propagateProperties(
+ final Collection<String> vargs, final String... propNames) {
+ propagateProperties(vargs, false, propNames);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/ClientWireUp.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/ClientWireUp.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/ClientWireUp.java
new file mode 100644
index 0000000..b6506ca
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/ClientWireUp.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.client.parameters.ClientPresent;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Used on the Client side to setup event handlers and such.
+ */
+@ClientSide
+@Private
+final class ClientWireUp {
+ private static final Logger LOG = Logger.getLogger(ClientWireUp.class.getName());
+ private final RuntimeErrorProtoHandler runtimeErrorProtoHandler;
+ private final JobStatusMessageHandler jobStatusMessageHandler;
+ private final Optional<RemoteManager> remoteManager;
+ private final boolean isClientPresent;
+ private boolean isWired = false;
+
+ @Inject
+ ClientWireUp(final RemoteManager remoteManager,
+ final @Parameter(ClientPresent.class) String clientPresent,
+ final RuntimeErrorProtoHandler runtimeErrorProtoHandler,
+ final JobStatusMessageHandler jobStatusMessageHandler) {
+ this.remoteManager = Optional.ofNullable(remoteManager);
+ this.runtimeErrorProtoHandler = runtimeErrorProtoHandler;
+ this.jobStatusMessageHandler = jobStatusMessageHandler;
+ this.isClientPresent = clientPresent.equals(ClientPresent.YES);
+ LOG.log(Level.FINE, "Instantiated 'ClientWireUp'. Client present: " + this.isClientPresent());
+ }
+
+ @Inject
+ ClientWireUp(final @Parameter(ClientPresent.class) String clientPresent,
+ final RuntimeErrorProtoHandler runtimeErrorProtoHandler,
+ final JobStatusMessageHandler jobStatusMessageHandler) {
+ this(null, clientPresent, runtimeErrorProtoHandler, jobStatusMessageHandler);
+ }
+
+ synchronized void performWireUp() {
+ if (this.isWired) {
+ throw new IllegalStateException("performWireUp is only to be called once.");
+ }
+ if (this.remoteManager.isPresent()) {
+ LOG.log(Level.FINEST, "Wiring up communications channels to the Driver.");
+ final RemoteManager rm = this.remoteManager.get();
+ rm.registerHandler(ReefServiceProtos.RuntimeErrorProto.class, this.runtimeErrorProtoHandler);
+ rm.registerHandler(ReefServiceProtos.JobStatusProto.class, this.jobStatusMessageHandler);
+ LOG.log(Level.FINE, "Wired up communications channels to the Driver.");
+ }
+ this.isWired = true;
+ }
+
+ synchronized boolean isClientPresent() {
+ return this.isClientPresent;
+ }
+
+ synchronized String getRemoteManagerIdentifier() {
+ if (!this.isClientPresent() || !this.remoteManager.isPresent()) {
+ throw new RuntimeException("No need to setup the remote manager.");
+ } else {
+ return this.remoteManager.get().getMyIdentifier();
+ }
+ }
+
+ /**
+ * Closes the remote manager, if there was one.
+ */
+ synchronized void close() {
+ if (this.remoteManager.isPresent()) {
+ try {
+ this.remoteManager.get().close();
+ } catch (final Exception e) {
+ LOG.log(Level.WARNING, "Exception while shutting down the RemoteManager.", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CommonClientConfigurationModule.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CommonClientConfigurationModule.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CommonClientConfigurationModule.java
new file mode 100644
index 0000000..59892e9
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CommonClientConfigurationModule.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+
+
+public class CommonClientConfigurationModule extends ConfigurationModuleBuilder {
+ public final static ConfigurationModule CONF = new CommonClientConfigurationModule()
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CompletedJobImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CompletedJobImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CompletedJobImpl.java
new file mode 100644
index 0000000..908ea54
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/CompletedJobImpl.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.client.CompletedJob;
+
+/**
+ * An implementation of CompletedJob
+ */
+@ClientSide
+@Private
+final class CompletedJobImpl implements CompletedJob {
+ private final String jobId;
+
+ CompletedJobImpl(final String jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public String getId() {
+ return this.jobId;
+ }
+
+ @Override
+ public String toString() {
+ return "CompletedJob{'" + jobId + "'}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobStatusMessageHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobStatusMessageHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobStatusMessageHandler.java
new file mode 100644
index 0000000..2ef9706
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobStatusMessageHandler.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A Handler for JobStatus messages from running jobs
+ */
+@ClientSide
+@Private
+final class JobStatusMessageHandler implements EventHandler<RemoteMessage<ReefServiceProtos.JobStatusProto>> {
+ private final Logger LOG = Logger.getLogger(JobStatusMessageHandler.class.getName());
+ private final RunningJobs runningJobs;
+
+ @Inject
+ JobStatusMessageHandler(final RunningJobs runningJobs) {
+ this.runningJobs = runningJobs;
+ LOG.log(Level.FINE, "Instantiated 'JobStatusMessageHandler'");
+ }
+
+ @Override
+ public void onNext(RemoteMessage<ReefServiceProtos.JobStatusProto> jobStatusProtoRemoteMessage) {
+ this.runningJobs.onJobStatusMessage(jobStatusProtoRemoteMessage);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
new file mode 100644
index 0000000..6b89c09
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.JARFileMaker;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Shared code between job submission with and without client
+ */
+final class JobSubmissionHelper {
+
+ static {
+ System.out.println(
+ "\nPowered by\n" +
+ " ___________ ______ ______ _______\n" +
+ " / ______ / / ___/ / ___/ / ____/\n" +
+ " / _____/ / /__ / /__ / /___\n" +
+ " / /\\ \\ / ___/ / ___/ / ____/\n" +
+ " / / \\ \\ / /__ / /__ / /\n" +
+ "/__/ \\__\\ /_____/ /_____/ /__/\n"
+ );
+ }
+
+ private static final Logger LOG = Logger.getLogger(JobSubmissionHelper.class.getName());
+
+ private final ConfigurationSerializer configurationSerializer;
+
+ @Inject
+ JobSubmissionHelper(final ConfigurationSerializer configurationSerializer) {
+ this.configurationSerializer = configurationSerializer;
+ }
+
+ /**
+ * Fils out a JobSubmissionProto based on the driver configuration given.
+ *
+ * @param driverConfiguration
+ * @return
+ * @throws InjectionException
+ * @throws IOException
+ */
+ final ClientRuntimeProtocol.JobSubmissionProto.Builder getJobsubmissionProto(final Configuration driverConfiguration) throws InjectionException, IOException {
+ final Injector injector = Tang.Factory.getTang().newInjector(driverConfiguration);
+
+ final ClientRuntimeProtocol.JobSubmissionProto.Builder jbuilder = ClientRuntimeProtocol.JobSubmissionProto.newBuilder()
+ .setIdentifier(returnOrGenerateDriverId(injector.getNamedInstance(DriverIdentifier.class)))
+ .setDriverMemory(injector.getNamedInstance(DriverMemory.class))
+ .setUserName(System.getProperty("user.name"))
+ .setConfiguration(configurationSerializer.toString(driverConfiguration));
+
+
+ for (final String globalFileName : injector.getNamedInstance(JobGlobalFiles.class)) {
+ LOG.log(Level.FINEST, "Adding global file: {0}", globalFileName);
+ jbuilder.addGlobalFile(getFileResourceProto(globalFileName, ReefServiceProtos.FileType.PLAIN));
+ }
+
+ for (final String globalLibraryName : injector.getNamedInstance(JobGlobalLibraries.class)) {
+ LOG.log(Level.FINEST, "Adding global library: {0}", globalLibraryName);
+ jbuilder.addGlobalFile(getFileResourceProto(globalLibraryName, ReefServiceProtos.FileType.LIB));
+ }
+
+ for (final String localFileName : injector.getNamedInstance(DriverLocalFiles.class)) {
+ LOG.log(Level.FINEST, "Adding local file: {0}", localFileName);
+ jbuilder.addLocalFile(getFileResourceProto(localFileName, ReefServiceProtos.FileType.PLAIN));
+ }
+
+ for (final String localLibraryName : injector.getNamedInstance(DriverLocalLibraries.class)) {
+ LOG.log(Level.FINEST, "Adding local library: {0}", localLibraryName);
+ jbuilder.addLocalFile(getFileResourceProto(localLibraryName, ReefServiceProtos.FileType.LIB));
+ }
+
+ return jbuilder;
+ }
+
+
+ /**
+ * @param configuredId
+ * @return the given driver ID (if it is not the default) or generates a new unique one if it is.
+ */
+ private static String returnOrGenerateDriverId(final String configuredId) {
+ final String result;
+ if (configuredId.equals(DriverIdentifier.DEFAULT_VALUE)) {
+ // Generate a uniqe driver ID
+ LOG.log(Level.FINE, "No Job Identifier given. Generating a unique one.");
+ result = "REEF-" + System.getProperty("user.name", "UNKNOWN_USER") + "-" + System.currentTimeMillis();
+ } else {
+ result = configuredId;
+ }
+ return result;
+ }
+
+
+ /**
+ * Turns a pathname into the right protocol for job submission.
+ *
+ * @param fileName
+ * @param type
+ * @return
+ * @throws IOException
+ */
+ private static ReefServiceProtos.FileResourceProto getFileResourceProto(final String fileName, final ReefServiceProtos.FileType type) throws IOException {
+ File file = new File(fileName);
+ if (file.exists()) {
+ // It is a local file and can be added.
+ if (file.isDirectory()) {
+ // If it is a directory, create a JAR file of it and add that instead.
+ file = toJar(file);
+ }
+ return ReefServiceProtos.FileResourceProto.newBuilder()
+ .setName(file.getName())
+ .setPath(file.getPath())
+ .setType(type)
+ .build();
+ } else {
+ // The file isn't in the local filesytem. Assume that the file is actually a URI.
+ // We then assume that the underlying resource manager knows how to deal with it.
+ try {
+ final URI uri = new URI(fileName);
+ final String path = uri.getPath();
+ final String name = path.substring(path.lastIndexOf('/') + 1);
+ return ReefServiceProtos.FileResourceProto.newBuilder()
+ .setName(name)
+ .setPath(uri.toString())
+ .setType(type)
+ .build();
+ } catch (final URISyntaxException e) {
+ throw new IOException("Unable to parse URI.", e);
+ }
+ }
+ }
+
+ /**
+ * Turns temporary folder "foo" into a jar file "foo.jar"
+ *
+ * @param file
+ * @return
+ * @throws IOException
+ */
+ private static File toJar(final File file) throws IOException {
+ final File tempFolder = Files.createTempDirectory("reef-tmp-tempFolder").toFile();
+ final File jarFile = File.createTempFile(file.getCanonicalFile().getName(), ".jar", tempFolder);
+ LOG.log(Level.FINEST, "Adding contents of folder {0} to {1}", new Object[]{file, jarFile});
+ try (final JARFileMaker jarMaker = new JARFileMaker(jarFile)) {
+ jarMaker.addChildren(file);
+ }
+ return jarFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
new file mode 100644
index 0000000..f0074f9
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.client.REEF;
+import org.apache.reef.proto.ClientRuntimeProtocol.JobSubmissionProto;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.util.REEFVersion;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@ClientSide
+@Provided
+@Private
+public final class REEFImplementation implements REEF {
+
+ private final static Logger LOG = Logger.getLogger(REEFImplementation.class.getName());
+
+ private final JobSubmissionHandler jobSubmissionHandler;
+ private final RunningJobs runningJobs;
+ private final JobSubmissionHelper jobSubmissionHelper;
+ private final ClientWireUp clientWireUp;
+ private final LoggingScopeFactory loggingScopeFactory;
+
+ /**
+ * @param jobSubmissionHandler
+ * @param runningJobs
+ * @param jobSubmissionHelper
+ * @param jobStatusMessageHandler is passed only to make sure it is instantiated
+ * @param clientWireUp
+ * @param reefVersion provides the current version of REEF.
+ */
+ @Inject
+ REEFImplementation(final JobSubmissionHandler jobSubmissionHandler,
+ final RunningJobs runningJobs,
+ final JobSubmissionHelper jobSubmissionHelper,
+ final JobStatusMessageHandler jobStatusMessageHandler,
+ final ClientWireUp clientWireUp,
+ final LoggingScopeFactory loggingScopeFactory,
+ final REEFVersion reefVersion) {
+ this.jobSubmissionHandler = jobSubmissionHandler;
+ this.runningJobs = runningJobs;
+ this.jobSubmissionHelper = jobSubmissionHelper;
+ this.clientWireUp = clientWireUp;
+ clientWireUp.performWireUp();
+ this.loggingScopeFactory = loggingScopeFactory;
+ reefVersion.logVersion();
+ }
+
+ @Override
+ public final void close() {
+ this.runningJobs.closeAllJobs();
+ this.clientWireUp.close();
+ }
+
+ @Override
+ public void submit(final Configuration driverConf) {
+ try (LoggingScope ls = this.loggingScopeFactory.reefSubmit()) {
+ final JobSubmissionProto submissionMessage;
+ try {
+ if (this.clientWireUp.isClientPresent()) {
+ submissionMessage = this.jobSubmissionHelper.getJobsubmissionProto(driverConf)
+ .setRemoteId(this.clientWireUp.getRemoteManagerIdentifier())
+ .build();
+ } else {
+ submissionMessage = this.jobSubmissionHelper.getJobsubmissionProto(driverConf)
+ .setRemoteId(ErrorHandlerRID.NONE)
+ .build();
+ }
+ } catch (final Exception e) {
+ throw new RuntimeException("Exception while processing driver configuration.", e);
+ }
+
+ this.jobSubmissionHandler.onNext(submissionMessage);
+ }
+ }
+
+ @NamedParameter(doc = "The driver remote identifier.")
+ public final static class DriverRemoteIdentifier implements Name<String> {
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobImpl.java
new file mode 100644
index 0000000..c41597a
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobImpl.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.client.CompletedJob;
+import org.apache.reef.client.FailedJob;
+import org.apache.reef.client.JobMessage;
+import org.apache.reef.client.RunningJob;
+import org.apache.reef.client.parameters.JobCompletedHandler;
+import org.apache.reef.client.parameters.JobFailedHandler;
+import org.apache.reef.client.parameters.JobMessageHandler;
+import org.apache.reef.client.parameters.JobRunningHandler;
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.proto.ClientRuntimeProtocol.JobControlProto;
+import org.apache.reef.proto.ClientRuntimeProtocol.Signal;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.proto.ReefServiceProtos.JobStatusProto;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Implementation of RunningJob.
+ */
+@ClientSide
+@Private
+public final class RunningJobImpl implements RunningJob, EventHandler<JobStatusProto> {
+
+ private static final Logger LOG = Logger.getLogger(RunningJob.class.getName());
+
+ private final String jobId;
+
+ private final EventHandler<JobControlProto> jobControlHandler;
+ private final EventHandler<RunningJob> runningJobEventHandler;
+ private final EventHandler<CompletedJob> completedJobEventHandler;
+ private final EventHandler<FailedJob> failedJobEventHandler;
+ private final EventHandler<JobMessage> jobMessageEventHandler;
+ private final ExceptionCodec exceptionCodec;
+
+ @Inject
+ RunningJobImpl(final RemoteManager remoteManager,
+ final @Parameter(DriverIdentifier.class) String driverIdentifier,
+ final @Parameter(REEFImplementation.DriverRemoteIdentifier.class) String driverRID,
+ final @Parameter(JobRunningHandler.class) EventHandler<RunningJob> runningJobEventHandler,
+ final @Parameter(JobCompletedHandler.class) EventHandler<CompletedJob> completedJobEventHandler,
+ final @Parameter(JobFailedHandler.class) EventHandler<FailedJob> failedJobEventHandler,
+ final @Parameter(JobMessageHandler.class) EventHandler<JobMessage> jobMessageEventHandler,
+ final ExceptionCodec exceptionCodec) {
+
+ this.jobId = driverIdentifier;
+ this.runningJobEventHandler = runningJobEventHandler;
+ this.completedJobEventHandler = completedJobEventHandler;
+ this.failedJobEventHandler = failedJobEventHandler;
+ this.jobMessageEventHandler = jobMessageEventHandler;
+ this.exceptionCodec = exceptionCodec;
+ this.jobControlHandler = remoteManager.getHandler(driverRID, JobControlProto.class);
+
+ this.runningJobEventHandler.onNext(this);
+ LOG.log(Level.FINE, "Instantiated 'RunningJobImpl'");
+ }
+
+ @Override
+ public synchronized void close() {
+ this.jobControlHandler.onNext(
+ JobControlProto.newBuilder()
+ .setIdentifier(this.jobId)
+ .setSignal(Signal.SIG_TERMINATE)
+ .build()
+ );
+ }
+
+ @Override
+ public synchronized void close(final byte[] message) {
+ this.jobControlHandler.onNext(
+ JobControlProto.newBuilder()
+ .setIdentifier(this.jobId)
+ .setSignal(Signal.SIG_TERMINATE)
+ .setMessage(ByteString.copyFrom(message))
+ .build()
+ );
+ }
+
+ @Override
+ public String getId() {
+ return this.jobId;
+ }
+
+ @Override
+ public synchronized void send(final byte[] message) {
+ this.jobControlHandler.onNext(
+ JobControlProto.newBuilder()
+ .setIdentifier(this.jobId)
+ .setMessage(ByteString.copyFrom(message))
+ .build()
+ );
+ }
+
+ @Override
+ public synchronized void onNext(final JobStatusProto value) {
+
+ final ReefServiceProtos.State state = value.getState();
+ LOG.log(Level.FINEST, "Received job status: {0} from {1}",
+ new Object[]{state, value.getIdentifier()});
+
+ if (value.hasMessage()) {
+ this.jobMessageEventHandler.onNext(
+ new JobMessage(getId(), value.getMessage().toByteArray()));
+ }
+ if (state == ReefServiceProtos.State.DONE) {
+ this.completedJobEventHandler.onNext(new CompletedJobImpl(this.getId()));
+ } else if (state == ReefServiceProtos.State.FAILED) {
+ this.onJobFailure(value);
+ }
+ }
+
+ /**
+ * Inform the client of a failed job.
+ *
+ * @param jobStatusProto
+ */
+ private synchronized void onJobFailure(final JobStatusProto jobStatusProto) {
+ assert (jobStatusProto.getState() == ReefServiceProtos.State.FAILED);
+
+ final String id = this.jobId;
+ final Optional<byte[]> data = jobStatusProto.hasException() ?
+ Optional.of(jobStatusProto.getException().toByteArray()) :
+ Optional.<byte[]>empty();
+ final Optional<Throwable> cause = this.exceptionCodec.fromBytes(data);
+
+ final String message = cause.isPresent() ?
+ cause.get().getMessage() :
+ "No Message sent by the Job";
+ final Optional<String> description = Optional.of(message);
+
+ final FailedJob failedJob = new FailedJob(id, message, description, cause, data);
+ this.failedJobEventHandler.onNext(failedJob);
+ }
+
+ @Override
+ public String toString() {
+ return "RunningJob{'" + this.jobId + "'}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobs.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobs.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobs.java
new file mode 100644
index 0000000..7c9bbb7
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobs.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+/**
+ * Manages the RunningJobs a client knows about
+ */
+@Private
+@ClientSide
+@DefaultImplementation(RunningJobsImpl.class)
+interface RunningJobs {
+
+ /**
+ * Closes all registered jobs forcefully.
+ */
+ public void closeAllJobs();
+
+ /**
+ * Processes a status message from a Job. If the Job is already known, the message will be passed on. If it is a
+ * first message, a new RunningJob instance will be created for it.
+ *
+ * @param message
+ */
+ public void onJobStatusMessage(final RemoteMessage<ReefServiceProtos.JobStatusProto> message);
+
+ /**
+ * Processes a error message from the resource manager.
+ *
+ * @param runtimeFailure
+ */
+ public void onRuntimeErrorMessage(final RemoteMessage<ReefServiceProtos.RuntimeErrorProto> runtimeFailure);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobsImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobsImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobsImpl.java
new file mode 100644
index 0000000..7a9ed05
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RunningJobsImpl.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.client.FailedRuntime;
+import org.apache.reef.client.parameters.ResourceManagerErrorHandler;
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@ClientSide
+@Private
+final class RunningJobsImpl implements RunningJobs {
+ private static final Logger LOG = Logger.getLogger(RunningJobsImpl.class.getName());
+ private final Map<String, RunningJobImpl> jobs = new HashMap<>();
+ private final Injector injector;
+ private final InjectionFuture<EventHandler<FailedRuntime>> failedRuntimeEventHandler;
+
+ @Inject
+ RunningJobsImpl(final Injector injector,
+ final @Parameter(ResourceManagerErrorHandler.class) InjectionFuture<EventHandler<FailedRuntime>> failedRuntimeEventHandler) {
+ this.injector = injector;
+ this.failedRuntimeEventHandler = failedRuntimeEventHandler;
+ LOG.log(Level.FINE, "Instantiated 'RunningJobImpl'");
+ }
+
+
+ @Override
+ public synchronized void closeAllJobs() {
+ for (final RunningJobImpl runningJob : this.jobs.values()) {
+ LOG.log(Level.WARNING, "Force close job {0}", runningJob.getId());
+ runningJob.close();
+ }
+ }
+
+ @Override
+ public synchronized void onJobStatusMessage(final RemoteMessage<ReefServiceProtos.JobStatusProto> message) {
+ final ReefServiceProtos.JobStatusProto status = message.getMessage();
+ final String jobIdentifier = status.getIdentifier();
+ LOG.log(Level.FINE, "Processing message from Job: " + jobIdentifier);
+
+ if (status.getState() == ReefServiceProtos.State.INIT) {
+ try {
+ final RunningJobImpl runningJob = this.newRunningJob(status.getIdentifier(), message.getIdentifier().toString());
+ this.put(runningJob);
+ } catch (final BindException | InjectionException configError) {
+ throw new RuntimeException("Configuration error for: " + status, configError);
+ }
+ }
+
+ this.get(jobIdentifier).onNext(status);
+ if ((status.getState() != ReefServiceProtos.State.RUNNING) &&
+ (status.getState() != ReefServiceProtos.State.INIT)) {
+ this.remove(status.getIdentifier());
+ }
+ LOG.log(Level.FINE, "Done processing message from Job " + jobIdentifier);
+ }
+
+ @Override
+ public synchronized void onRuntimeErrorMessage(RemoteMessage<ReefServiceProtos.RuntimeErrorProto> runtimeFailure) {
+ try {
+ this.remove(runtimeFailure.getMessage().getIdentifier());
+ } finally {
+ this.failedRuntimeEventHandler.get().onNext(new FailedRuntime(runtimeFailure.getMessage()));
+ }
+ }
+
+
+ /**
+ * A guarded get() that throws an exception if the RunningJob isn't known
+ *
+ * @param jobIdentifier
+ * @return
+ */
+ private synchronized RunningJobImpl get(final String jobIdentifier) {
+ final RunningJobImpl result = this.jobs.get(jobIdentifier);
+ if (null == result) {
+ throw new RuntimeException("Trying to get a RunningJob that is unknown: " + jobIdentifier);
+ }
+ return result;
+ }
+
+ /**
+ * A guarded remove() that throws an exception if no RunningJob is known for this id.
+ *
+ * @param jobIdentifier
+ */
+ private synchronized void remove(final String jobIdentifier) {
+ final RunningJobImpl result = this.jobs.remove(jobIdentifier);
+ if (null == result) {
+ throw new RuntimeException("Trying to remove a RunningJob that is unknown: " + jobIdentifier);
+ }
+ }
+
+
+ private synchronized void put(final RunningJobImpl runningJob) {
+ final String jobIdentifier = runningJob.getId();
+ if (this.jobs.containsKey(jobIdentifier)) {
+ throw new IllegalStateException("Trying to re-add a job that is already known: " + jobIdentifier);
+ }
+ LOG.log(Level.FINE, "Adding Job with ID: " + jobIdentifier);
+ this.jobs.put(jobIdentifier, runningJob);
+ }
+
+ /**
+ * @param jobIdentifier
+ * @param remoteIdentifier
+ * @return
+ * @throws BindException
+ * @throws InjectionException
+ */
+ private synchronized RunningJobImpl newRunningJob(final String jobIdentifier, final String remoteIdentifier) throws BindException, InjectionException {
+ final Injector child = this.injector.forkInjector();
+ child.bindVolatileParameter(REEFImplementation.DriverRemoteIdentifier.class, remoteIdentifier);
+ child.bindVolatileParameter(DriverIdentifier.class, jobIdentifier);
+ return child.getInstance(RunningJobImpl.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RuntimeErrorProtoHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RuntimeErrorProtoHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RuntimeErrorProtoHandler.java
new file mode 100644
index 0000000..71d7895
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/RuntimeErrorProtoHandler.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Used in REEFImplementation.
+ */
+final class RuntimeErrorProtoHandler implements EventHandler<RemoteMessage<ReefServiceProtos.RuntimeErrorProto>> {
+
+ private final static Logger LOG = Logger.getLogger(RuntimeErrorProtoHandler.class.getName());
+
+ private final InjectionFuture<RunningJobs> runningJobs;
+
+ @Inject
+ RuntimeErrorProtoHandler(final InjectionFuture<RunningJobs> runningJobs) {
+ this.runningJobs = runningJobs;
+ }
+
+
+ @Override
+ public void onNext(final RemoteMessage<ReefServiceProtos.RuntimeErrorProto> error) {
+ LOG.log(Level.WARNING, "{0} Runtime Error: {1}", new Object[]{
+ error.getIdentifier(), error.getMessage().getMessage()});
+ this.runningJobs.get().onRuntimeErrorMessage(error);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/ClientRuntimeParameters.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/ClientRuntimeParameters.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/ClientRuntimeParameters.java
new file mode 100644
index 0000000..cdc418b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/ClientRuntimeParameters.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.api;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+public final class ClientRuntimeParameters {
+
+ @NamedParameter(doc = "The runtime error handler RID.")
+ public final static class RuntimeErrorHandlerRID implements Name<String> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java
new file mode 100644
index 0000000..af2d064
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.api;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.ClientRuntimeProtocol.JobSubmissionProto;
+import org.apache.reef.wake.EventHandler;
+
+@RuntimeAuthor
+public interface JobSubmissionHandler extends EventHandler<JobSubmissionProto>, AutoCloseable {
+
+ @Override
+ public void close();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/package-info.java
new file mode 100644
index 0000000..7480e61
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Client-Side Event Handlers to be implemented by a specific resourcemanager
+ */
+package org.apache.reef.runtime.common.client.api;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultCompletedJobHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultCompletedJobHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultCompletedJobHandler.java
new file mode 100644
index 0000000..2f4d7d4
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultCompletedJobHandler.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.defaults;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.CompletedJob;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for CompletedJob: Logging it.
+ */
+@Provided
+@ClientSide
+public final class DefaultCompletedJobHandler implements EventHandler<CompletedJob> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultCompletedJobHandler.class.getName());
+
+ @Inject
+ private DefaultCompletedJobHandler() {
+ }
+
+ @Override
+ public void onNext(final CompletedJob job) {
+ LOG.log(Level.INFO, "Job Completed: {0}", job);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultFailedJobHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultFailedJobHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultFailedJobHandler.java
new file mode 100644
index 0000000..076a316
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultFailedJobHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.defaults;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.FailedJob;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Default event handler for FailedJob: rethrow the exception.
+ */
+@Provided
+@ClientSide
+public final class DefaultFailedJobHandler implements EventHandler<FailedJob> {
+
+ @Inject
+ private DefaultFailedJobHandler() {
+ }
+
+ @Override
+ public void onNext(final FailedJob job) {
+ throw new RuntimeException("REEF job failed: " + job.getId(), job.getReason().orElse(null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultJobMessageHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultJobMessageHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultJobMessageHandler.java
new file mode 100644
index 0000000..59a6115
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultJobMessageHandler.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.defaults;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.JobMessage;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+/**
+ * Default event handler for job message: Logging it.
+ */
+@Provided
+@ClientSide
+public final class DefaultJobMessageHandler implements EventHandler<JobMessage> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultJobMessageHandler.class.getName());
+
+ @Inject
+ private DefaultJobMessageHandler() {
+ }
+
+ @Override
+ public void onNext(final JobMessage message) {
+ LOG.log(Level.FINE, "Received message: {0}", message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRunningJobHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRunningJobHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRunningJobHandler.java
new file mode 100644
index 0000000..4b94c53
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRunningJobHandler.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.defaults;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.RunningJob;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for RunningJob: Logging it.
+ */
+@Provided
+@ClientSide
+public final class DefaultRunningJobHandler implements EventHandler<RunningJob> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultRunningJobHandler.class.getName());
+
+ @Inject
+ private DefaultRunningJobHandler() {
+ }
+
+ @Override
+ public void onNext(final RunningJob job) {
+ LOG.log(Level.INFO, "Job is running: {0}", job);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRuntimeErrorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRuntimeErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRuntimeErrorHandler.java
new file mode 100644
index 0000000..e72706b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultRuntimeErrorHandler.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.defaults;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.FailedRuntime;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for REEF FailedRuntime: rethrow the exception.
+ */
+@Provided
+@ClientSide
+public final class DefaultRuntimeErrorHandler implements EventHandler<FailedRuntime> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultRuntimeErrorHandler.class.getName());
+
+ @Inject
+ private DefaultRuntimeErrorHandler() {
+ }
+
+ @Override
+ public void onNext(final FailedRuntime error) {
+ if (error.getReason().isPresent()) {
+ LOG.log(Level.SEVERE, "Runtime error: " + error, error.getReason().get());
+ } else {
+ LOG.log(Level.SEVERE, "Runtime error: " + error);
+ }
+ throw new RuntimeException("REEF runtime error: " + error, error.asError());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/package-info.java
new file mode 100644
index 0000000..6263ab1
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Default implementations for the optional client-side event handlers.
+ */
+package org.apache.reef.runtime.common.client.defaults;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/package-info.java
new file mode 100644
index 0000000..9af9dfe
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of the client-side REEF API.
+ */
+package org.apache.reef.runtime.common.client;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/parameters/ClientPresent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/parameters/ClientPresent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/parameters/ClientPresent.java
new file mode 100644
index 0000000..0d0e361
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/parameters/ClientPresent.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.parameters;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * A Tang named parameter that indicates whether or not a client (handler for the various Job events) is present.
+ */
+@ClientSide
+@NamedParameter(doc = "Indicates whether or not a client is present", default_value = ClientPresent.NO)
+public final class ClientPresent implements Name<String> {
+ public static final String YES = "YES";
+ public static final String NO = "NO";
+}