You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:02 UTC
[29/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
new file mode 100644
index 0000000..c42cba5
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.io.IOException;
+
+import static org.apache.reef.examples.scheduler.SchedulerREEF.runTaskScheduler;
+
+/**
+ * REEF TaskScheduler on YARN runtime.
+ */
+public final class SchedulerREEFYarn {
+ /**
+ * Launch the scheduler with YARN client configuration
+ * @param args
+ * @throws InjectionException
+ * @throws java.io.IOException
+ */
+ public final static void main(String[] args)
+ throws InjectionException, IOException, ParseException {
+ final Configuration runtimeConfiguration =
+ YarnClientConfiguration.CONF.build();
+ runTaskScheduler(runtimeConfiguration, args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
new file mode 100644
index 0000000..50293e6
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+/**
+ * This class specifies the response from the Scheduler.
+ * It includes the status code and message.
+ */
+final class SchedulerResponse {
+ /**
+ * 200 OK : The request succeeded normally.
+ */
+ private static final int SC_OK = 200;
+
+ /**
+ * 400 BAD REQUEST : The request is syntactically incorrect.
+ */
+ private static final int SC_BAD_REQUEST = 400;
+
+ /**
+ * 403 FORBIDDEN : Syntactically okay but refused to process.
+ */
+ private static final int SC_FORBIDDEN = 403;
+
+ /**
+ * 404 NOT FOUND : The resource is not available.
+ */
+ private static final int SC_NOT_FOUND = 404;
+
+ /**
+ * Create a response with OK status
+ */
+ public static SchedulerResponse OK(final String message){
+ return new SchedulerResponse (SC_OK, message);
+ }
+
+ /**
+ * Create a response with BAD_REQUEST status
+ */
+ public static SchedulerResponse BAD_REQUEST(final String message){
+ return new SchedulerResponse (SC_BAD_REQUEST, message);
+ }
+
+ /**
+ * Create a response with FORBIDDEN status
+ */
+ public static SchedulerResponse FORBIDDEN(final String message){
+ return new SchedulerResponse (SC_FORBIDDEN, message);
+ }
+
+ /**
+ * Create a response with NOT FOUND status
+ */
+ public static SchedulerResponse NOT_FOUND(final String message){
+ return new SchedulerResponse (SC_NOT_FOUND, message);
+ }
+
+ /**
+ * Return {@code true} if the response is OK.
+ */
+ public boolean isOK(){
+ return this.status == SC_OK;
+ }
+
+ /**
+ * Status code of the request based on RFC 2068.
+ */
+ private int status;
+
+ /**
+ * Message to send.
+ */
+ private String message;
+
+ /**
+ * Constructor using status code and message.
+ * @param status
+ * @param message
+ */
+ private SchedulerResponse(final int status, final String message) {
+ this.status = status;
+ this.message = message;
+ }
+
+ /**
+ * Return the status code of this response.
+ */
+ int getStatus() {
+ return status;
+ }
+
+ /**
+ * Return the message of this response.
+ */
+ String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
new file mode 100644
index 0000000..fe777ff
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+/**
+ * TaskEntity represent a single entry of task queue used in
+ * scheduler. Since REEF already has the class named {Task},
+ * a different name is used for this class.
+ */
+final class TaskEntity {
+ private final int taskId;
+ private final String command;
+
+ public TaskEntity(final int taskId, final String command) {
+ this.taskId = taskId;
+ this.command = command;
+ }
+
+ /**
+ * Return the TaskID assigned to this Task.
+ */
+ int getId() {
+ return taskId;
+ }
+
+ String getCommand() {
+ return command;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TaskEntity that = (TaskEntity) o;
+
+ if (taskId != that.taskId) return false;
+ if (!command.equals(that.command)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = taskId;
+ result = 31 * result + command.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("<Id=").append(taskId).
+ append(", Command=").append(command).append(">").toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java
new file mode 100644
index 0000000..728806c
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Task scheduler example based on reef-webserver
+ */
+package org.apache.reef.examples.scheduler;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java
new file mode 100644
index 0000000..db312ce
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.LoggingEventHandler;
+import org.apache.reef.wake.impl.ThreadPoolStage;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Link;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public final class Control {
+
+ private static final Logger LOG = Logger.getLogger(Control.class.getName());
+ private final transient String command;
+ private final transient String taskId;
+ private final transient int port;
+
+ @Inject
+ public Control(@Parameter(SuspendClientControl.Port.class) final int port,
+ @Parameter(TaskId.class) final String taskId,
+ @Parameter(Command.class) final String command) {
+ this.command = command.trim().toLowerCase();
+ this.taskId = taskId;
+ this.port = port;
+ }
+
+ private static Configuration getConfig(final String[] args) throws IOException, BindException {
+ final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+ new CommandLine(cb).processCommandLine(args, SuspendClientControl.Port.class, TaskId.class, Command.class);
+ return cb.build();
+ }
+
+ public static void main(final String[] args) throws Exception {
+ final Configuration config = getConfig(args);
+ final Injector injector = Tang.Factory.getTang().newInjector(config);
+ final Control control = injector.getInstance(Control.class);
+ control.run();
+ }
+
+ public void run() throws Exception {
+
+ LOG.log(Level.INFO, "command: {0} task: {1} port: {2}",
+ new Object[]{this.command, this.taskId, this.port});
+
+ final ObjectSerializableCodec<String> codec = new ObjectSerializableCodec<>();
+
+ final EStage<TransportEvent> stage = new ThreadPoolStage<>("suspend-control-client",
+ new LoggingEventHandler<TransportEvent>(), 1, new EventHandler<Throwable>() {
+ @Override
+ public void onNext(final Throwable throwable) {
+ throw new RuntimeException(throwable);
+ }
+ });
+
+ try (final Transport transport = new NettyMessagingTransport("localhost", 0, stage, stage, 1, 10000)) {
+ final Link link = transport.open(new InetSocketAddress("localhost", this.port), codec, null);
+ link.write(this.command + " " + this.taskId);
+ }
+ }
+
+ @NamedParameter(doc = "Task id", short_name = "task")
+ public static final class TaskId implements Name<String> {
+ }
+
+ @NamedParameter(doc = "Command: 'suspend' or 'resume'", short_name = "cmd")
+ public static final class Command implements Name<String> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Launch.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Launch.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Launch.java
new file mode 100644
index 0000000..696d02d
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Launch.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.reef.client.ClientConfiguration;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+import org.apache.reef.tang.formats.CommandLine;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Suspend/Resume example - main class.
+ */
+public final class Launch {
+
+ /**
+ * Standard Java logger
+ */
+ private static final Logger LOG = Logger.getLogger(Launch.class.getName());
+ /**
+ * Number of REEF worker threads in local mode.
+ */
+ private static final int NUM_LOCAL_THREADS = 4;
+
+ /**
+ * This class should not be instantiated.
+ */
+ private Launch() {
+ throw new RuntimeException("Do not instantiate this class!");
+ }
+
+ /**
+ * @param args command line arguments, as passed to main()
+ * @return Configuration object.
+ */
+ private static Configuration parseCommandLine(final String[] args)
+ throws IOException, BindException {
+ final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+ final CommandLine cl = new CommandLine(confBuilder);
+ cl.registerShortNameOfClass(Local.class);
+ cl.registerShortNameOfClass(NumCycles.class);
+ cl.registerShortNameOfClass(Delay.class);
+ cl.registerShortNameOfClass(SuspendClientControl.Port.class);
+ cl.processCommandLine(args);
+ return confBuilder.build();
+ }
+
+ private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf)
+ throws InjectionException, BindException {
+ final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
+ final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+ cb.bindNamedParameter(NumCycles.class, String.valueOf(injector.getNamedInstance(NumCycles.class)));
+ cb.bindNamedParameter(Delay.class, String.valueOf(injector.getNamedInstance(Delay.class)));
+ cb.bindNamedParameter(SuspendClientControl.Port.class,
+ String.valueOf(injector.getNamedInstance(SuspendClientControl.Port.class)));
+ return cb.build();
+ }
+
+ /**
+ * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
+ *
+ * @param args Command line arguments, as passed into main().
+ * @return (immutable) TANG Configuration object.
+ * @throws BindException if configuration commandLineInjector fails.
+ * @throws InjectionException if configuration commandLineInjector fails.
+ * @throws IOException error reading the configuration.
+ */
+ private static Configuration getClientConfiguration(final String[] args)
+ throws BindException, InjectionException, IOException {
+ final Configuration commandLineConf = parseCommandLine(args);
+
+ final Configuration clientConfiguration = ClientConfiguration.CONF
+ .set(ClientConfiguration.ON_JOB_RUNNING, SuspendClient.RunningJobHandler.class)
+ .set(ClientConfiguration.ON_JOB_FAILED, SuspendClient.FailedJobHandler.class)
+ .set(ClientConfiguration.ON_JOB_COMPLETED, SuspendClient.CompletedJobHandler.class)
+ .set(ClientConfiguration.ON_RUNTIME_ERROR, SuspendClient.RuntimeErrorHandler.class)
+ .build();
+
+ // TODO: Remove the injector, have stuff injected.
+ final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf);
+ final boolean isLocal = commandLineInjector.getNamedInstance(Local.class);
+ final Configuration runtimeConfiguration;
+ if (isLocal) {
+ LOG.log(Level.INFO, "Running on the local runtime");
+ runtimeConfiguration = LocalRuntimeConfiguration.CONF
+ .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
+ .build();
+ } else {
+ LOG.log(Level.INFO, "Running on YARN");
+ runtimeConfiguration = YarnClientConfiguration.CONF.build();
+ }
+
+ return Tang.Factory.getTang()
+ .newConfigurationBuilder(runtimeConfiguration, clientConfiguration,
+ cloneCommandLineConfiguration(commandLineConf))
+ .build();
+ }
+
+ /**
+ * Main method that runs the example.
+ *
+ * @param args command line parameters.
+ */
+ public static void main(final String[] args) {
+ try {
+ final Configuration config = getClientConfiguration(args);
+
+ LOG.log(Level.INFO, "Configuration:\n--\n{0}--",
+ new AvroConfigurationSerializer().toString(config));
+
+ final Injector injector = Tang.Factory.getTang().newInjector(config);
+ final SuspendClient client = injector.getInstance(SuspendClient.class);
+
+ client.submit();
+ client.waitForCompletion();
+ LOG.info("Done!");
+
+ } catch (final BindException | IOException | InjectionException ex) {
+ LOG.log(Level.SEVERE, "Cannot launch: configuration error", ex);
+ } catch (final Exception ex) {
+ LOG.log(Level.SEVERE, "Cleanup error", ex);
+ }
+ }
+
+ /**
+ * Command line parameter = true to run locally, or false to run on YARN.
+ */
+ @NamedParameter(doc = "Whether or not to run on the local runtime",
+ short_name = "local", default_value = "true")
+ public static final class Local implements Name<Boolean> {
+ }
+
+ /**
+ * Command line parameter: number of iterations to run.
+ */
+ @NamedParameter(doc = "Number of iterations to run", short_name = "cycles", default_value = "20")
+ public static final class NumCycles implements Name<Integer> {
+ }
+
+ /**
+ * Command line parameter: delay in seconds for each cycle.
+ */
+ @NamedParameter(doc = "Delay in seconds between the cycles", short_name = "delay", default_value = "1")
+ public static final class Delay implements Name<Integer> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/ObjectWritableCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/ObjectWritableCodec.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/ObjectWritableCodec.java
new file mode 100644
index 0000000..72b257a
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/ObjectWritableCodec.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
+
+import java.io.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Codec for Hadoop Writable object serialization.
+ *
+ * @param <T> Class derived from Hadoop Writable.
+ */
+public class ObjectWritableCodec<T extends Writable> implements Codec<T> {
+
+ /**
+ * Standard Java logger
+ */
+ private static final Logger LOG = Logger.getLogger(ObjectWritableCodec.class.getName());
+
+ /**
+ * we need it to invoke the class constructor.
+ */
+ private final Class<? extends T> writableClass;
+
+ /**
+ * Create a new codec for Hadoop Writables.
+ *
+ * @param clazz we need it to invoke the class constructor.
+ */
+ public ObjectWritableCodec(final Class<? extends T> clazz) {
+ this.writableClass = clazz;
+ }
+
+ /**
+ * Encodes Hadoop Writable object into a byte array.
+ *
+ * @param writable the object to encode.
+ * @return serialized object as byte array.
+ * @throws RemoteRuntimeException if serialization fails.
+ */
+ @Override
+ public byte[] encode(T writable) {
+ try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ final DataOutputStream dos = new DataOutputStream(bos)) {
+ writable.write(dos);
+ return bos.toByteArray();
+ } catch (final IOException ex) {
+ LOG.log(Level.SEVERE, "Cannot encode object " + writable, ex);
+ throw new RemoteRuntimeException(ex);
+ }
+ }
+
+ /**
+ * Decode Hadoop Writable object from a byte array.
+ *
+ * @param buffer serialized version of the Writable object (as a byte array).
+ * @return a Writable object.
+ * @throws RemoteRuntimeException if deserialization fails.
+ */
+ @Override
+ public T decode(byte[] buffer) {
+ try (final ByteArrayInputStream bis = new ByteArrayInputStream(buffer);
+ final DataInputStream dis = new DataInputStream(bis)) {
+ final T writable = this.writableClass.newInstance();
+ writable.readFields(dis);
+ return writable;
+ } catch (final IOException | InstantiationException | IllegalAccessException ex) {
+ LOG.log(Level.SEVERE, "Cannot decode class " + this.writableClass, ex);
+ throw new RemoteRuntimeException(ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java
new file mode 100644
index 0000000..e57e9b5
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.reef.client.*;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+public class SuspendClient {
+
+ /**
+ * Standard java logger.
+ */
+ private final static Logger LOG = Logger.getLogger(SuspendClient.class.getName());
+
+ /**
+ * Job Driver configuration.
+ */
+ private final Configuration driverConfig;
+
+ /**
+ * Reference to the REEF framework.
+ */
+ private final REEF reef;
+
+ /**
+ * Controller that listens for suspend/resume commands on a specified port.
+ */
+ private final SuspendClientControl controlListener;
+
+ /**
+ * @param reef reference to the REEF framework.
+ * @param port port to listen to for suspend/resume commands.
+ * @param numCycles number of cycles to run in the task.
+ * @param delay delay in seconds between cycles in the task.
+ */
+ @Inject
+ SuspendClient(
+ final REEF reef,
+ final @Parameter(SuspendClientControl.Port.class) int port,
+ final @Parameter(Launch.NumCycles.class) int numCycles,
+ final @Parameter(Launch.Delay.class) int delay) throws BindException, IOException {
+
+ final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindNamedParameter(Launch.NumCycles.class, Integer.toString(numCycles))
+ .bindNamedParameter(Launch.Delay.class, Integer.toString(delay));
+
+ cb.addConfiguration(DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(SuspendDriver.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "suspend-" + System.currentTimeMillis())
+ .set(DriverConfiguration.ON_TASK_RUNNING, SuspendDriver.RunningTaskHandler.class)
+ .set(DriverConfiguration.ON_TASK_COMPLETED, SuspendDriver.CompletedTaskHandler.class)
+ .set(DriverConfiguration.ON_TASK_SUSPENDED, SuspendDriver.SuspendedTaskHandler.class)
+ .set(DriverConfiguration.ON_TASK_MESSAGE, SuspendDriver.TaskMessageHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, SuspendDriver.AllocatedEvaluatorHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_ACTIVE, SuspendDriver.ActiveContextHandler.class)
+ .set(DriverConfiguration.ON_CLIENT_MESSAGE, SuspendDriver.ClientMessageHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_STARTED, SuspendDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_STOP, SuspendDriver.StopHandler.class)
+ .build());
+
+ this.driverConfig = cb.build();
+ this.reef = reef;
+ this.controlListener = new SuspendClientControl(port);
+ }
+
+ /**
+ * Start the job driver.
+ */
+ public void submit() {
+ LOG.info("Start the job driver");
+ this.reef.submit(this.driverConfig);
+ }
+
+ /**
+ * Wait for the job to complete.
+ */
+ public void waitForCompletion() throws Exception {
+ LOG.info("Waiting for the Job Driver to complete.");
+ try {
+ synchronized (this) {
+ this.wait();
+ }
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.WARNING, "Waiting for result interrupted.", ex);
+ }
+ this.reef.close();
+ this.controlListener.close();
+ }
+
+ /**
+ * Receive notification from the driver that the job is about to run.
+ * RunningJob object is a proxy to the running job driver that can be used for sending messages.
+ */
+ final class RunningJobHandler implements EventHandler<RunningJob> {
+ @Override
+ public void onNext(final RunningJob job) {
+ LOG.log(Level.INFO, "Running job: {0}", job.getId());
+ SuspendClient.this.controlListener.setRunningJob(job);
+ }
+ }
+
+ /**
+ * Receive notification from the driver that the job had failed.
+ * <p/>
+ * FailedJob is a proxy for the failed job driver
+ * (contains job ID and exception thrown from the driver).
+ */
+ final class FailedJobHandler implements EventHandler<FailedJob> {
+ @Override
+ public void onNext(final FailedJob job) {
+ LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null));
+ synchronized (SuspendClient.this) {
+ SuspendClient.this.notify();
+ }
+ }
+ }
+
+ /**
+ * Receive notification from the driver that the job had completed successfully.
+ */
+ final class CompletedJobHandler implements EventHandler<CompletedJob> {
+ @Override
+ public void onNext(final CompletedJob job) {
+ LOG.log(Level.INFO, "Completed job: {0}", job.getId());
+ synchronized (SuspendClient.this) {
+ SuspendClient.this.notify();
+ }
+ }
+ }
+
+ /**
+ * Receive notification that there was an exception thrown from the job driver.
+ */
+ final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
+ @Override
+ public void onNext(final FailedRuntime error) {
+ LOG.log(Level.SEVERE, "ERROR: " + error, error.getReason().orElse(null));
+ synchronized (SuspendClient.class) {
+ SuspendClient.this.notify();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java
new file mode 100644
index 0000000..accb5a8
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.reef.client.RunningJob;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.ThreadPoolStage;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * (Wake) listener to get suspend/resume commands from Control process.
+ */
+public class SuspendClientControl implements AutoCloseable {
+
+ private static final Logger LOG = Logger.getLogger(Control.class.getName());
+ private static final ObjectSerializableCodec<byte[]> CODEC = new ObjectSerializableCodec<>();
+ private final transient Transport transport;
+ private transient RunningJob runningJob;
+
+ @Inject
+ public SuspendClientControl(
+ final @Parameter(SuspendClientControl.Port.class) int port) throws IOException {
+
+ LOG.log(Level.INFO, "Listen to control port {0}", port);
+
+ final EStage<TransportEvent> stage = new ThreadPoolStage<>(
+ "suspend-control-server", new ControlMessageHandler(), 1, new EventHandler<Throwable>() {
+ @Override
+ public void onNext(final Throwable throwable) {
+ throw new RuntimeException(throwable);
+ }
+ });
+
+ this.transport = new NettyMessagingTransport("localhost", port, stage, stage, 1, 10000);
+ }
+
+ public synchronized void setRunningJob(final RunningJob job) {
+ this.runningJob = job;
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.transport.close();
+ }
+
+ @NamedParameter(doc = "Port for suspend/resume control commands",
+ short_name = "port", default_value = "7008")
+ public static final class Port implements Name<Integer> {
+ }
+
+ /**
+ * Forward remote message to the job driver.
+ */
+ private class ControlMessageHandler implements EventHandler<TransportEvent> {
+ @Override
+ public synchronized void onNext(final TransportEvent msg) {
+ LOG.log(Level.INFO, "Control message: {0} destination: {1}",
+ new Object[]{CODEC.decode(msg.getData()), runningJob});
+ if (runningJob != null) {
+ runningJob.send(msg.getData());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendDriver.java
new file mode 100644
index 0000000..38d7542
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendDriver.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.io.checkpoint.fs.FSCheckPointServiceConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import javax.xml.bind.DatatypeConverter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Suspend/resume example job driver. Execute a simple task in all evaluators,
+ * and sendEvaluatorControlMessage suspend/resume events properly.
+ */
+@Unit
+public class SuspendDriver {
+
+ /**
+ * Standard Java logger.
+ */
+ private static final Logger LOG = Logger.getLogger(SuspendDriver.class.getName());
+
+ /**
+ * Number of evaluators to request
+ */
+ private static final int NUM_EVALUATORS = 2;
+
+ /**
+ * String codec is used to encode the results driver sends to the client.
+ */
+ private static final ObjectSerializableCodec<String> CODEC_STR = new ObjectSerializableCodec<>();
+
+ /**
+ * Integer codec is used to decode the results driver gets from the tasks.
+ */
+ private static final ObjectSerializableCodec<Integer> CODEC_INT = new ObjectSerializableCodec<>();
+
+ /**
+ * Job observer on the client.
+ * We use it to send results from the driver back to the client.
+ */
+ private final JobMessageObserver jobMessageObserver;
+
+ /**
+ * Job driver uses EvaluatorRequestor to request Evaluators that will run the Tasks.
+ */
+ private final EvaluatorRequestor evaluatorRequestor;
+
+ /**
+ * TANG Configuration of the Task.
+ */
+ private final Configuration contextConfig;
+
+ /**
+ * Map from task ID (a string) to the TaskRuntime instance (that can be suspended).
+ */
+ private final Map<String, RunningTask> runningTasks =
+ Collections.synchronizedMap(new HashMap<String, RunningTask>());
+
+ /**
+ * Map from task ID (a string) to the SuspendedTask instance (that can be resumed).
+ */
+ private final Map<String, SuspendedTask> suspendedTasks = new HashMap<>();
+
+ /**
+ * Job driver constructor.
+ * All parameters are injected from TANG automatically.
+ *
+ * @param evaluatorRequestor is used to request Evaluators.
+ * @param numCycles number of cycles to run in the task.
+ * @param delay delay in seconds between cycles in the task.
+ */
+ @Inject
+ SuspendDriver(
+ final JobMessageObserver jobMessageObserver,
+ final EvaluatorRequestor evaluatorRequestor,
+ final @Parameter(Launch.Local.class) boolean isLocal,
+ final @Parameter(Launch.NumCycles.class) int numCycles,
+ final @Parameter(Launch.Delay.class) int delay) {
+
+ this.jobMessageObserver = jobMessageObserver;
+ this.evaluatorRequestor = evaluatorRequestor;
+
+ try {
+
+ final Configuration checkpointServiceConfig = FSCheckPointServiceConfiguration.CONF
+ .set(FSCheckPointServiceConfiguration.IS_LOCAL, Boolean.toString(isLocal))
+ .set(FSCheckPointServiceConfiguration.PATH, "/tmp")
+ .set(FSCheckPointServiceConfiguration.PREFIX, "reef-checkpoint-")
+ .set(FSCheckPointServiceConfiguration.REPLICATION_FACTOR, "3")
+ .build();
+
+ final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindNamedParameter(Launch.NumCycles.class, Integer.toString(numCycles))
+ .bindNamedParameter(Launch.Delay.class, Integer.toString(delay));
+
+ cb.addConfiguration(checkpointServiceConfig);
+ this.contextConfig = cb.build();
+
+ } catch (final BindException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Receive notification that the Task is ready to run.
+ */
+ final class RunningTaskHandler implements EventHandler<RunningTask> {
+ @Override
+ public final void onNext(final RunningTask task) {
+ LOG.log(Level.INFO, "Running task: {0}", task.getId());
+ runningTasks.put(task.getId(), task);
+ jobMessageObserver.sendMessageToClient(CODEC_STR.encode("start task: " + task.getId()));
+ }
+ }
+
+ /**
+ * Receive notification that the Task has completed successfully.
+ */
+ final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+ @Override
+ public final void onNext(final CompletedTask task) {
+
+ final EvaluatorDescriptor e = task.getActiveContext().getEvaluatorDescriptor();
+ final String msg = "Task completed " + task.getId() + " on node " + e;
+ LOG.info(msg);
+
+ jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
+ runningTasks.remove(task.getId());
+ task.getActiveContext().close();
+
+ final boolean noTasks;
+
+ synchronized (suspendedTasks) {
+ LOG.log(Level.INFO, "Tasks running: {0} suspended: {1}", new Object[]{
+ runningTasks.size(), suspendedTasks.size()});
+ noTasks = runningTasks.isEmpty() && suspendedTasks.isEmpty();
+ }
+
+ if (noTasks) {
+ LOG.info("All tasks completed; shutting down.");
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the Task has been suspended.
+ */
+ final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
+ @Override
+ public final void onNext(final SuspendedTask task) {
+
+ final String msg = "Task suspended: " + task.getId();
+ LOG.info(msg);
+
+ synchronized (suspendedTasks) {
+ suspendedTasks.put(task.getId(), task);
+ runningTasks.remove(task.getId());
+ }
+
+ jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
+ }
+ }
+
+ /**
+ * Receive message from the Task.
+ */
+ final class TaskMessageHandler implements EventHandler<TaskMessage> {
+ @Override
+ public void onNext(final TaskMessage message) {
+ final int result = CODEC_INT.decode(message.get());
+ final String msg = "Task message " + message.getId() + ": " + result;
+ LOG.info(msg);
+ jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
+ }
+ }
+
+ /**
+ * Receive notification that an Evaluator had been allocated,
+ * and submitTask a new Task in that Evaluator.
+ */
+ final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator eval) {
+ try {
+
+ LOG.log(Level.INFO, "Allocated Evaluator: {0}", eval.getId());
+
+ final Configuration thisContextConfiguration = ContextConfiguration.CONF.set(
+ ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build();
+
+ eval.submitContext(Tang.Factory.getTang()
+ .newConfigurationBuilder(thisContextConfiguration, contextConfig).build());
+
+ } catch (final BindException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ /**
+ * Receive notification that a new Context is available.
+ * Submit a new Task to that Context.
+ */
+ final class ActiveContextHandler implements EventHandler<ActiveContext> {
+ @Override
+ public synchronized void onNext(final ActiveContext context) {
+ LOG.log(Level.INFO, "Active Context: {0}", context.getId());
+ try {
+ context.submitTask(TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task")
+ .set(TaskConfiguration.TASK, SuspendTestTask.class)
+ .set(TaskConfiguration.ON_SUSPEND, SuspendTestTask.SuspendHandler.class)
+ .build());
+ } catch (final BindException ex) {
+ LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ /**
+ * Handle notifications from the client.
+ */
+ final class ClientMessageHandler implements EventHandler<byte[]> {
+ @Override
+ public void onNext(final byte[] message) {
+
+ final String commandStr = CODEC_STR.decode(message);
+ LOG.log(Level.INFO, "Client message: {0}", commandStr);
+
+ final String[] split = commandStr.split("\\s+", 2);
+ if (split.length != 2) {
+ throw new IllegalArgumentException("Bad command: " + commandStr);
+ } else {
+
+ final String command = split[0].toLowerCase().intern();
+ final String taskId = split[1];
+
+ switch (command) {
+
+ case "suspend": {
+ final RunningTask task = runningTasks.get(taskId);
+ if (task != null) {
+ task.suspend();
+ } else {
+ throw new IllegalArgumentException("Suspend: Task not found: " + taskId);
+ }
+ break;
+ }
+
+ case "resume": {
+ final SuspendedTask suspendedTask;
+ synchronized (suspendedTasks) {
+ suspendedTask = suspendedTasks.remove(taskId);
+ }
+ if (suspendedTask != null) {
+ try {
+ suspendedTask.getActiveContext().submitTask(TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, taskId)
+ .set(TaskConfiguration.MEMENTO,
+ DatatypeConverter.printBase64Binary(suspendedTask.get()))
+ .build());
+ } catch (final BindException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new IllegalArgumentException("Resume: Task not found: " + taskId);
+ }
+ break;
+ }
+
+ default:
+ throw new IllegalArgumentException("Bad command: " + command);
+ }
+ }
+ }
+ }
+
+ /**
+ * Job Driver is ready and the clock is set up: request the evaluators.
+ */
+ final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime time) {
+ LOG.log(Level.INFO, "StartTime: {0}", time);
+ evaluatorRequestor.submit(EvaluatorRequest.newBuilder()
+ .setMemory(128).setNumberOfCores(1).setNumber(NUM_EVALUATORS).build());
+ }
+ }
+
+ /**
+ * Shutting down the job driver: close the evaluators.
+ */
+ final class StopHandler implements EventHandler<StopTime> {
+ @Override
+ public void onNext(final StopTime time) {
+ LOG.log(Level.INFO, "StopTime: {0}", time);
+ jobMessageObserver.sendMessageToClient(CODEC_STR.encode("got StopTime"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendTestTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendTestTask.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendTestTask.java
new file mode 100644
index 0000000..5eedf1f
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendTestTask.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.reef.io.checkpoint.CheckpointID;
+import org.apache.reef.io.checkpoint.CheckpointService;
+import org.apache.reef.io.checkpoint.CheckpointService.CheckpointReadChannel;
+import org.apache.reef.io.checkpoint.CheckpointService.CheckpointWriteChannel;
+import org.apache.reef.io.checkpoint.fs.FSCheckpointID;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.task.Task;
+import org.apache.reef.task.TaskMessage;
+import org.apache.reef.task.TaskMessageSource;
+import org.apache.reef.task.events.SuspendEvent;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Simple do-nothing task that can send messages to the Driver and can be suspended/resumed.
+ */
+@Unit
+public class SuspendTestTask implements Task, TaskMessageSource {
+
+ /**
+ * Standard java logger.
+ */
+ private static final Logger LOG = Logger.getLogger(SuspendTestTask.class.getName());
+ private final CheckpointService checkpointService;
+ /**
+ * number of cycles to run in the task.
+ */
+ private final int numCycles;
+ /**
+ * delay in milliseconds between cycles in the task.
+ */
+ private final int delay;
+ /**
+ * Codec to serialize/deserialize counter values for the updates.
+ */
+ private final ObjectSerializableCodec<Integer> codecInt = new ObjectSerializableCodec<>();
+ /**
+ * Codec to serialize/deserialize checkpoint IDs for suspend/resume.
+ */
+ private final ObjectWritableCodec<CheckpointID> codecCheckpoint =
+ new ObjectWritableCodec<CheckpointID>(FSCheckpointID.class);
+ /**
+ * Current value of the counter.
+ */
+ private int counter = 0;
+ /**
+ * True if the suspend message has been received, false otherwise.
+ */
+ private boolean suspended = false;
+
+ /**
+ * Task constructor: invoked by TANG.
+ *
+ * @param numCycles number of cycles to run in the task.
+ * @param delay delay in seconds between cycles in the task.
+ */
+ @Inject
+ public SuspendTestTask(
+ final CheckpointService checkpointService,
+ @Parameter(Launch.NumCycles.class) final int numCycles,
+ @Parameter(Launch.Delay.class) final int delay) {
+ this.checkpointService = checkpointService;
+ this.numCycles = numCycles;
+ this.delay = delay * 1000;
+ }
+
+ /**
+ * Main method of the task: run cycle from 0 to numCycles,
+ * and sleep for delay seconds on each cycle.
+ *
+ * @param memento serialized version of the counter.
+ * Empty array for initial run, but can contain value for resumed job.
+ * @return serialized version of the counter.
+ */
+ @Override
+ public synchronized byte[] call(final byte[] memento) throws IOException, InterruptedException {
+
+ LOG.log(Level.INFO, "Start: {0} counter: {1}/{2}",
+ new Object[]{this, this.counter, this.numCycles});
+
+ if (memento != null && memento.length > 0) {
+ this.restore(memento);
+ }
+
+ this.suspended = false;
+ for (; this.counter < this.numCycles && !this.suspended; ++this.counter) {
+ try {
+ LOG.log(Level.INFO, "Run: {0} counter: {1}/{2} sleep: {3}",
+ new Object[]{this, this.counter, this.numCycles, this.delay});
+ this.wait(this.delay);
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.INFO, "{0} interrupted. counter: {1}: {2}",
+ new Object[]{this, this.counter, ex});
+ }
+ }
+
+ return this.suspended ? this.save() : this.codecInt.encode(this.counter);
+ }
+
+ /**
+ * Update driver on current state of the task.
+ *
+ * @return serialized version of the counter.
+ */
+ @Override
+ public synchronized Optional<TaskMessage> getMessage() {
+ LOG.log(Level.INFO, "Message from Task {0} to the Driver: counter: {1}",
+ new Object[]{this, this.counter});
+ return Optional.of(TaskMessage.from(SuspendTestTask.class.getName(), this.codecInt.encode(this.counter)));
+ }
+
+ /**
+ * Save current state of the task in the checkpoint.
+ *
+ * @return checkpoint ID (serialized)
+ */
+ private synchronized byte[] save() throws IOException, InterruptedException {
+ try (final CheckpointWriteChannel channel = this.checkpointService.create()) {
+ channel.write(ByteBuffer.wrap(this.codecInt.encode(this.counter)));
+ return this.codecCheckpoint.encode(this.checkpointService.commit(channel));
+ }
+ }
+
+ /**
+ * Restore the task state from the given checkpoint.
+ *
+ * @param memento serialized checkpoint ID
+ */
+ private synchronized void restore(final byte[] memento) throws IOException, InterruptedException {
+ final CheckpointID checkpointId = this.codecCheckpoint.decode(memento);
+ try (final CheckpointReadChannel channel = this.checkpointService.open(checkpointId)) {
+ final ByteBuffer buffer = ByteBuffer.wrap(this.codecInt.encode(this.counter));
+ channel.read(buffer);
+ this.counter = this.codecInt.decode(buffer.array());
+ }
+ this.checkpointService.delete(checkpointId);
+ }
+
+ public class SuspendHandler implements EventHandler<SuspendEvent> {
+
+ @Override
+ public void onNext(SuspendEvent suspendEvent) {
+ final byte[] message = suspendEvent.get().get();
+ LOG.log(Level.INFO, "Suspend: {0} with: {1} bytes; counter: {2}",
+ new Object[]{this, message.length, SuspendTestTask.this.counter});
+ SuspendTestTask.this.suspended = true;
+ SuspendTestTask.this.notify();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/package-info.java
new file mode 100644
index 0000000..9017daa
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * suspend/resume demo.
+ */
+package org.apache.reef.examples.suspend;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java
new file mode 100644
index 0000000..e6a5d6c
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.utils.wake;
+
+import org.apache.reef.wake.EventHandler;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An EventHandler that blocks until a set number of Events has been received.
+ * Once they have been received, the downstream event handler is called with an
+ * Iterable of the events spooled.
+ *
+ * @param <T>
+ */
+public final class BlockingEventHandler<T> implements EventHandler<T> {
+
+ private final int expectedSize;
+ private final EventHandler<Iterable<T>> destination;
+ private List<T> events = new ArrayList<>();
+
+ public BlockingEventHandler(final int expectedSize, final EventHandler<Iterable<T>> destination) {
+ this.expectedSize = expectedSize;
+ this.destination = destination;
+ }
+
+ @Override
+ public final void onNext(final T event) {
+ if (this.isComplete()) {
+ throw new IllegalStateException("Received more Events than expected");
+ }
+ this.events.add(event);
+ if (this.isComplete()) {
+ this.destination.onNext(events);
+ this.reset();
+ }
+ }
+
+ private boolean isComplete() {
+ return this.events.size() >= expectedSize;
+ }
+
+ private void reset() {
+ this.events = new ArrayList<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java
new file mode 100644
index 0000000..9d2d5b8
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.utils.wake;
+
+import org.apache.reef.wake.EventHandler;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An EventHandler that logs its events before handing it to a downstream
+ * EventHandler.
+ *
+ * @param <T>
+ */
+public class LoggingEventHandler<T> implements EventHandler<T> {
+
+ private final EventHandler<T> downstreamEventHandler;
+ private final String prefix;
+ private final String suffix;
+
+ /**
+ * @param prefix to be logged before the event
+ * @param downstreamEventHandler the event handler to hand the event to
+ * @param suffix to be logged after the event
+ */
+ public LoggingEventHandler(final String prefix, EventHandler<T> downstreamEventHandler, final String suffix) {
+ this.downstreamEventHandler = downstreamEventHandler;
+ this.prefix = prefix;
+ this.suffix = suffix;
+ }
+
+ public LoggingEventHandler(final EventHandler<T> downstreamEventHandler) {
+ this("", downstreamEventHandler, "");
+ }
+
+ @Override
+ public void onNext(final T value) {
+ Logger.getLogger(LoggingEventHandler.class.getName()).log(Level.INFO, prefix + value.toString() + suffix);
+ this.downstreamEventHandler.onNext(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/test/java/org/apache/reef/examples/hello/HelloHttpTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/test/java/org/apache/reef/examples/hello/HelloHttpTest.java b/lang/java/reef-examples/src/test/java/org/apache/reef/examples/hello/HelloHttpTest.java
new file mode 100644
index 0000000..14812ca
--- /dev/null
+++ b/lang/java/reef-examples/src/test/java/org/apache/reef/examples/hello/HelloHttpTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.examples.hellohttp.HelloREEFHttp;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HelloHttpTest {
+ @Test
+ public void testHttpServer() throws BindException, InjectionException {
+
+ final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
+ .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, 2)
+ .build();
+
+ final LauncherStatus status = HelloREEFHttp.runHelloReef(runtimeConfiguration, 10 * 1000);
+ Assert.assertEquals(LauncherStatus.FORCE_CLOSED, status); // must be force closed by timeout
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/test/java/org/apache/reef/examples/suspend/ObjectWritableCodecTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/test/java/org/apache/reef/examples/suspend/ObjectWritableCodecTest.java b/lang/java/reef-examples/src/test/java/org/apache/reef/examples/suspend/ObjectWritableCodecTest.java
new file mode 100644
index 0000000..827d57a
--- /dev/null
+++ b/lang/java/reef-examples/src/test/java/org/apache/reef/examples/suspend/ObjectWritableCodecTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.reef.io.checkpoint.CheckpointID;
+import org.apache.reef.io.checkpoint.fs.FSCheckpointID;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ObjectWritableCodecTest {
+
+ private static ObjectWritableCodec<CheckpointID> codec;
+
+ /**
+ * Test class setup - create the codec.
+ */
+ @BeforeClass
+ public static void setUpClass() {
+ codec = new ObjectWritableCodec<CheckpointID>(FSCheckpointID.class);
+ }
+
+ /**
+ * After the encode/decode cycle result equals to the original object.
+ */
+ @Test
+ public void testFSCheckpointIdCodec() {
+ final CheckpointID checkpoint1 = new FSCheckpointID(new Path("path"));
+ final byte[] serialized = codec.encode(checkpoint1);
+ final CheckpointID checkpoint2 = codec.decode(serialized);
+ Assert.assertEquals(checkpoint1, checkpoint2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/pom.xml b/lang/java/reef-io/pom.xml
new file mode 100644
index 0000000..c517e38
--- /dev/null
+++ b/lang/java/reef-io/pom.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.reef</groupId>
+ <artifactId>reef-project</artifactId>
+ <version>0.11.0-incubating-SNAPSHOT</version>
+ </parent>
+ <artifactId>reef-io</artifactId>
+ <name>REEF IO</name>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>generate-sources</id>
+ <phase>generate-sources</phase>
+ <configuration>
+ <tasks>
+ <mkdir dir="target/generated-sources/proto"/>
+ <exec executable="protoc">
+ <arg value="--proto_path=src/main/proto/"/>
+ <arg value="--java_out=target/generated-sources/proto"/>
+ <arg value="src/main/proto/ns_protocol.proto"/>
+ </exec>
+ </tasks>
+ <sourceRoot>target/generated-sources/proto</sourceRoot>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/proto</source>
+ <source>target/generated-sources/avro</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-webserver</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <!-- HADOOP -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn</artifactId>
+ <version>${hadoop.version}</version>
+ <type>pom</type>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- END OF HADOOP -->
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/avro/nameservice.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/avro/nameservice.avsc b/lang/java/reef-io/src/main/avro/nameservice.avsc
new file mode 100644
index 0000000..9a10478
--- /dev/null
+++ b/lang/java/reef-io/src/main/avro/nameservice.avsc
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ [
+{
+ "namespace":"org.apache.reef.io.network.naming.avro",
+ "type":"record",
+ "name":"AvroNamingLookupRequest",
+ "fields":[
+ {"name":"ids","type":{"type":"array", "items":"string"}}
+ ]
+},
+{
+ "namespace":"org.apache.reef.io.network.naming.avro",
+ "type":"record",
+ "name":"AvroNamingAssignment",
+ "fields":[
+ {"name":"id","type":"string"},
+ {"name":"host","type":"string"},
+ {"name":"port","type":"int"}
+ ]
+},
+{
+ "namespace":"org.apache.reef.io.network.naming.avro",
+ "type":"record",
+ "name":"AvroNamingLookupResponse",
+ "fields":[
+ {"name":"tuples","type":{"type":"array", "items":"AvroNamingAssignment"}}
+ ]
+},
+{
+ "namespace":"org.apache.reef.io.network.naming.avro",
+ "type":"record",
+ "name":"AvroNamingRegisterRequest",
+ "fields":[
+ {"name":"id","type":"string"},
+ {"name":"host","type":"string"},
+ {"name":"port","type":"int"}
+ ]
+},
+{
+ "namespace":"org.apache.reef.io.network.naming.avro",
+ "type":"record",
+ "name":"AvroNamingUnRegisterRequest",
+ "fields":[
+ {"name":"id","type":"string"}
+ ]
+}
+]
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoader.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoader.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoader.java
new file mode 100644
index 0000000..632a349
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoader.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.io.data.loading.impl.EvaluatorRequestSerializer;
+import org.apache.reef.io.network.util.Pair;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.SingleThreadStage;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The driver component for the DataLoadingService
+ * Also acts as the central point for resource requests
+ * All the allocated evaluators pass through this and
+ * the ones that need data loading have a context stacked
+ * that enables a task to get access to Data via the
+ * {@link DataSet}.
+ * <p/>
+ * TODO: Add timeouts
+ */
+@DriverSide
+@Unit
+public class DataLoader {
+
+ private static final Logger LOG = Logger.getLogger(DataLoader.class.getName());
+
+ private final ConcurrentMap<String, Pair<Configuration, Configuration>> submittedDataEvalConfigs = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Configuration> submittedComputeEvalConfigs = new ConcurrentHashMap<>();
+ private final BlockingQueue<Configuration> failedComputeEvalConfigs = new LinkedBlockingQueue<>();
+ private final BlockingQueue<Pair<Configuration, Configuration>> failedDataEvalConfigs = new LinkedBlockingQueue<>();
+
+ private final AtomicInteger numComputeRequestsToSubmit = new AtomicInteger(0);
+
+ private final DataLoadingService dataLoadingService;
+ private final int dataEvalMemoryMB;
+ private final int dataEvalCore;
+ private final EvaluatorRequest computeRequest;
+ private final SingleThreadStage<EvaluatorRequest> resourceRequestStage;
+ private final ResourceRequestHandler resourceRequestHandler;
+ private final int computeEvalMemoryMB;
+ private final int computeEvalCore;
+ private final EvaluatorRequestor requestor;
+
+ @Inject
+ public DataLoader(
+ final Clock clock,
+ final EvaluatorRequestor requestor,
+ final DataLoadingService dataLoadingService,
+ final @Parameter(DataLoadingRequestBuilder.DataLoadingEvaluatorMemoryMB.class) int dataEvalMemoryMB,
+ final @Parameter(DataLoadingRequestBuilder.DataLoadingEvaluatorNumberOfCores.class) int dataEvalCore,
+ final @Parameter(DataLoadingRequestBuilder.DataLoadingComputeRequest.class) String serializedComputeRequest) {
+
+ // FIXME: Issue #855: We need this alarm to look busy for REEF.
+ clock.scheduleAlarm(30000, new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm time) {
+ LOG.log(Level.FINE, "Received Alarm: {0}", time);
+ }
+ });
+
+ this.requestor = requestor;
+ this.dataLoadingService = dataLoadingService;
+ this.dataEvalMemoryMB = dataEvalMemoryMB;
+ this.dataEvalCore = dataEvalCore;
+ this.resourceRequestHandler = new ResourceRequestHandler(requestor);
+ this.resourceRequestStage = new SingleThreadStage<>(this.resourceRequestHandler, 2);
+
+ if (serializedComputeRequest.equals("NULL")) {
+ this.computeRequest = null;
+ this.computeEvalMemoryMB = -1;
+ computeEvalCore = 1;
+ } else {
+ this.computeRequest = EvaluatorRequestSerializer.deserialize(serializedComputeRequest);
+ this.computeEvalMemoryMB = this.computeRequest.getMegaBytes();
+ this.computeEvalCore = this.computeRequest.getNumberOfCores();
+ this.numComputeRequestsToSubmit.set(this.computeRequest.getNumber());
+
+ this.resourceRequestStage.onNext(this.computeRequest);
+ }
+
+ this.resourceRequestStage.onNext(getDataLoadingRequest());
+ }
+
+ private EvaluatorRequest getDataLoadingRequest() {
+ return EvaluatorRequest.newBuilder()
+ .setNumber(this.dataLoadingService.getNumberOfPartitions())
+ .setMemory(this.dataEvalMemoryMB)
+ .setNumberOfCores(this.dataEvalCore)
+ .build();
+ }
+
+ public class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ LOG.log(Level.INFO, "StartTime: {0}", startTime);
+ resourceRequestHandler.releaseResourceRequestGate();
+ }
+ }
+
+ public class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+
+ @Override
+ public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+
+ final String evalId = allocatedEvaluator.getId();
+ LOG.log(Level.FINEST, "Allocated evaluator: {0}", evalId);
+
+ if (!failedComputeEvalConfigs.isEmpty()) {
+ LOG.log(Level.FINE, "Failed Compute requests need to be satisfied for {0}", evalId);
+ final Configuration conf = failedComputeEvalConfigs.poll();
+ if (conf != null) {
+ LOG.log(Level.FINE, "Satisfying failed configuration for {0}", evalId);
+ allocatedEvaluator.submitContext(conf);
+ submittedComputeEvalConfigs.put(evalId, conf);
+ return;
+ }
+ }
+
+ if (!failedDataEvalConfigs.isEmpty()) {
+ LOG.log(Level.FINE, "Failed Data requests need to be satisfied for {0}", evalId);
+ final Pair<Configuration, Configuration> confPair = failedDataEvalConfigs.poll();
+ if (confPair != null) {
+ LOG.log(Level.FINE, "Satisfying failed configuration for {0}", evalId);
+ allocatedEvaluator.submitContextAndService(confPair.first, confPair.second);
+ submittedDataEvalConfigs.put(evalId, confPair);
+ return;
+ }
+ }
+
+ final int evaluatorsForComputeRequest = numComputeRequestsToSubmit.decrementAndGet();
+ LOG.log(Level.FINE, "Evaluators for compute request: {0}", evaluatorsForComputeRequest);
+
+ if (evaluatorsForComputeRequest >= 0) {
+ try {
+ final Configuration idConfiguration = ContextConfiguration.CONF
+ .set(ContextConfiguration.IDENTIFIER,
+ dataLoadingService.getComputeContextIdPrefix() + evaluatorsForComputeRequest)
+ .build();
+ LOG.log(Level.FINE, "Submitting Compute Context to {0}", evalId);
+ allocatedEvaluator.submitContext(idConfiguration);
+ submittedComputeEvalConfigs.put(allocatedEvaluator.getId(), idConfiguration);
+ if (evaluatorsForComputeRequest == 0) {
+ LOG.log(Level.FINE, "All Compute requests satisfied. Releasing gate");
+ resourceRequestHandler.releaseResourceRequestGate();
+ }
+ } catch (final BindException e) {
+ throw new RuntimeException("Unable to bind context id for Compute request", e);
+ }
+
+ } else {
+
+ final Pair<Configuration, Configuration> confPair = new Pair<>(
+ dataLoadingService.getContextConfiguration(allocatedEvaluator),
+ dataLoadingService.getServiceConfiguration(allocatedEvaluator));
+
+ LOG.log(Level.FINE, "Submitting data loading context to {0}", evalId);
+ allocatedEvaluator.submitContextAndService(confPair.first, confPair.second);
+ submittedDataEvalConfigs.put(allocatedEvaluator.getId(), confPair);
+ }
+ }
+ }
+
+ public class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
+ @Override
+ public void onNext(final FailedEvaluator failedEvaluator) {
+
+ final String evalId = failedEvaluator.getId();
+
+ final Configuration computeConfig = submittedComputeEvalConfigs.remove(evalId);
+ if (computeConfig != null) {
+
+ LOG.log(Level.INFO, "Received failed compute evaluator: {0}", evalId);
+ failedComputeEvalConfigs.add(computeConfig);
+
+ requestor.submit(EvaluatorRequest.newBuilder()
+ .setMemory(computeEvalMemoryMB).setNumber(1).setNumberOfCores(computeEvalCore).build());
+
+ } else {
+
+ final Pair<Configuration, Configuration> confPair = submittedDataEvalConfigs.remove(evalId);
+ if (confPair != null) {
+
+ LOG.log(Level.INFO, "Received failed data evaluator: {0}", evalId);
+ failedDataEvalConfigs.add(confPair);
+
+ requestor.submit(EvaluatorRequest.newBuilder()
+ .setMemory(dataEvalMemoryMB).setNumber(1).setNumberOfCores(dataEvalCore).build());
+
+ } else {
+
+ LOG.log(Level.SEVERE, "Received unknown failed evaluator " + evalId,
+ failedEvaluator.getEvaluatorException());
+
+ throw new RuntimeException("Received failed evaluator that I did not submit: " + evalId);
+ }
+ }
+ }
+ }
+}