You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/07/02 21:48:49 UTC

incubator-reef git commit: [REEF-393]: OutputService for writing files to filesystem

Repository: incubator-reef
Updated Branches:
  refs/heads/master 26156627e -> 15dd78698


[REEF-393]: OutputService for writing files to filesystem

This addressed the issue by
  * implementing OutputService.
  * implementing an example application using OutputService.

JIRA:
  [REEF-393](https://issues.apache.org/jira/browse/REEF-393)

Pull Request:
  This closes #242


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/15dd7869
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/15dd7869
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/15dd7869

Branch: refs/heads/master
Commit: 15dd786988d9cfb5d456a0c67fb3d92ab9e02661
Parents: 2615662
Author: Kijung Shin <ki...@gmail.com>
Authored: Thu Jun 25 17:02:51 2015 +0900
Committer: Markus Weimer <we...@apache.org>
Committed: Thu Jul 2 12:45:50 2015 -0700

----------------------------------------------------------------------
 .../data/output/OutputServiceDriver.java        | 122 +++++++++++++
 .../examples/data/output/OutputServiceREEF.java | 171 +++++++++++++++++++
 .../examples/data/output/OutputServiceTask.java |  64 +++++++
 .../reef/examples/data/output/package-info.java |  24 +++
 .../reef/io/data/output/OutputService.java      |  34 ++++
 .../io/data/output/OutputStreamProvider.java    |  45 +++++
 .../reef/io/data/output/TaskOutputService.java  | 126 ++++++++++++++
 .../data/output/TaskOutputServiceBuilder.java   |  46 +++++
 .../data/output/TaskOutputStreamProvider.java   |  48 ++++++
 .../output/TaskOutputStreamProviderHDFS.java    |  83 +++++++++
 .../output/TaskOutputStreamProviderLocal.java   |  78 +++++++++
 .../reef/io/data/output/package-info.java       |  26 +++
 12 files changed, 867 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceDriver.java
new file mode 100644
index 0000000..50fb86c
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceDriver.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.data.output;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.io.data.output.OutputService;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Driver code for the output service demo app.
+ */
+@Unit
+public final class OutputServiceDriver {
+  private static final Logger LOG = Logger.getLogger(OutputServiceDriver.class.getName());
+
+  /**
+   * Evaluator requestor object used to create new evaluator containers.
+   */
+  private final EvaluatorRequestor requestor;
+
+  /**
+   * Output service object.
+   */
+  private final OutputService outputService;
+
+  /**
+   * Sub-id for Tasks.
+   * This object grants different IDs to each task
+   * e.g. Task-0, Task-1, and so on.
+   */
+  private final AtomicInteger taskId = new AtomicInteger(0);
+
+  /**
+   * Job driver constructor - instantiated via TANG.
+   *
+   * @param requestor evaluator requestor object used to create new evaluator containers.
+   * @param outputService output service object.
+   */
+  @Inject
+  public OutputServiceDriver(final EvaluatorRequestor requestor,
+                             final OutputService outputService) {
+    LOG.log(Level.FINE, "Instantiated 'OutputServiceDriver'");
+    this.requestor = requestor;
+    this.outputService = outputService;
+  }
+
+  /**
+   * Handles the StartTime event: Request three Evaluators.
+   */
+  public final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      OutputServiceDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
+          .setNumber(3)
+          .setMemory(64)
+          .setNumberOfCores(1)
+          .build());
+      LOG.log(Level.INFO, "Requested Evaluator.");
+    }
+  }
+
+  /**
+   * Handles AllocatedEvaluator: Submit the output service and a context for it.
+   */
+  public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+      LOG.log(Level.INFO, "Submitting Output Service to AllocatedEvaluator: {0}", allocatedEvaluator);
+      final Configuration contextConfiguration = ContextConfiguration.CONF
+          .set(ContextConfiguration.IDENTIFIER, "OutputServiceContext")
+          .build();
+      allocatedEvaluator.submitContextAndService(
+          contextConfiguration, outputService.getServiceConfiguration());
+    }
+  }
+
+  /**
+   * Handles ActiveContext: Submit the output service demo task.
+   */
+  public final class ActiveContextHandler implements EventHandler<ActiveContext> {
+    @Override
+    public void onNext(final ActiveContext activeContext) {
+      LOG.log(Level.INFO,
+          "Submitting OutputServiceREEF task to AllocatedEvaluator: {0}",
+          activeContext.getEvaluatorDescriptor());
+      final Configuration taskConfiguration = TaskConfiguration.CONF
+          .set(TaskConfiguration.IDENTIFIER, "Task-" + taskId.getAndIncrement())
+          .set(TaskConfiguration.TASK, OutputServiceTask.class)
+          .build();
+      activeContext.submitTask(taskConfiguration);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceREEF.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceREEF.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceREEF.java
new file mode 100644
index 0000000..139a8d2
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceREEF.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.data.output;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.io.data.output.*;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Client for the output service demo app.
+ */
+@ClientSide
+public final class OutputServiceREEF {
+  private static final Logger LOG = Logger.getLogger(OutputServiceREEF.class.getName());
+
+  public static void main(final String[] args)
+      throws InjectionException, BindException, IOException {
+
+    final Tang tang = Tang.Factory.getTang();
+    final JavaConfigurationBuilder cb = tang.newConfigurationBuilder();
+    new CommandLine(cb)
+        .registerShortNameOfClass(Local.class)
+        .registerShortNameOfClass(TimeOut.class)
+        .registerShortNameOfClass(OutputDir.class)
+        .processCommandLine(args);
+
+    final Injector injector = tang.newInjector(cb.build());
+    final boolean isLocal = injector.getNamedInstance(Local.class);
+    final String outputDir = injector.getNamedInstance(OutputDir.class);
+    final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000;
+
+    final Configuration driverConf = getDriverConf();
+    final Configuration outputServiceConf = getOutputServiceConf(isLocal, outputDir);
+    final Configuration submittedConfiguration = Tang.Factory.getTang()
+        .newConfigurationBuilder(driverConf, outputServiceConf)
+        .build();
+    final LauncherStatus state = DriverLauncher.getLauncher(getRuntimeConf(isLocal))
+        .run(submittedConfiguration, jobTimeout);
+
+    LOG.log(Level.INFO, "REEF job completed: {0}", state);
+  }
+
+  /**
+   * @param isLocal true for local runtime, or false for YARN runtime.
+   * @return The runtime configuration
+   */
+  private static Configuration getRuntimeConf(final boolean isLocal) {
+    final Configuration runtimeConf;
+    if (isLocal) {
+      LOG.log(Level.INFO, "Running the output service demo on the local runtime");
+      runtimeConf = LocalRuntimeConfiguration.CONF
+          .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, 3)
+          .build();
+    } else {
+      LOG.log(Level.INFO, "Running the output service demo on YARN");
+      runtimeConf = YarnClientConfiguration.CONF.build();
+    }
+    return runtimeConf;
+  }
+
+  /**
+   * @return The Driver configuration.
+   */
+  private static Configuration getDriverConf() {
+    final Configuration driverConf = DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(OutputServiceDriver.class))
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, "OutputServiceREEF")
+        .set(DriverConfiguration.ON_DRIVER_STARTED, OutputServiceDriver.StartHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, OutputServiceDriver.EvaluatorAllocatedHandler.class)
+        .set(DriverConfiguration.ON_CONTEXT_ACTIVE, OutputServiceDriver.ActiveContextHandler.class)
+        .build();
+
+    return driverConf;
+  }
+
+  /**
+   * @param isLocal true for local runtime, or false for YARN runtime.
+   * @param outputDir path of the output directory.
+   * @return The configuration to use OutputService
+   */
+  private static Configuration getOutputServiceConf(final boolean isLocal, final String outputDir) {
+    final Configuration outputServiceConf;
+    if (isLocal) {
+      outputServiceConf = TaskOutputServiceBuilder.CONF
+          .set(TaskOutputServiceBuilder.TASK_OUTPUT_STREAM_PROVIDER, TaskOutputStreamProviderLocal.class)
+          .set(TaskOutputServiceBuilder.OUTPUT_PATH, getAbsolutePath(outputDir))
+          .build();
+    } else {
+      outputServiceConf = TaskOutputServiceBuilder.CONF
+          .set(TaskOutputServiceBuilder.TASK_OUTPUT_STREAM_PROVIDER, TaskOutputStreamProviderHDFS.class)
+          .set(TaskOutputServiceBuilder.OUTPUT_PATH, outputDir)
+          .build();
+    }
+    return outputServiceConf;
+  }
+
+  /**
+   * transform the given relative path into the absolute path based on the current directory where a user runs the demo.
+   * @param relativePath relative path
+   * @return absolute path
+   */
+  private static String getAbsolutePath(final String relativePath) {
+    final File outputFile = new File(relativePath);
+    return outputFile.getAbsolutePath();
+  }
+
+  /**
+   * Command line parameter = true to run locally, or false to run on YARN.
+   */
+  @NamedParameter(doc = "Whether or not to run on the local runtime",
+      short_name = "local", default_value = "true")
+  public static final class Local implements Name<Boolean> {
+  }
+
+  /**
+   * Command line parameter = number of minutes before timeout.
+   */
+  @NamedParameter(doc = "Number of minutes before timeout",
+      short_name = "timeout", default_value = "2")
+  public static final class TimeOut implements Name<Integer> {
+  }
+
+  /**
+   * Command line parameter = path of the output directory.
+   */
+  @NamedParameter(doc = "Path of the output directory",
+      short_name = "output")
+  public static final class OutputDir implements Name<String> {
+  }
+
+  /**
+   * Empty private constructor to prohibit instantiation of utility class.
+   */
+  private OutputServiceREEF() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceTask.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceTask.java
new file mode 100644
index 0000000..b9ae0ea
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceTask.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.data.output;
+
+import org.apache.reef.io.data.output.OutputStreamProvider;
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * The Task code for the output service demo app.
+ * This task receives an output stream from the output service
+ * and writes "Hello REEF!" on it.
+ */
+public final class OutputServiceTask implements Task {
+
+  /**
+   * Output stream provider object through which tasks create output streams.
+   */
+  private final OutputStreamProvider outputStreamProvider;
+
+  /**
+   * Task constructor - instantiated via TANG.
+   *
+   * @param outputStreamProvider Output stream provider object through which tasks create output streams.
+   */
+  @Inject
+  public OutputServiceTask(final OutputStreamProvider outputStreamProvider) {
+    this.outputStreamProvider = outputStreamProvider;
+  }
+
+  /**
+   * Receives an output stream from the output service and writes "Hello REEF!" on it.
+   *
+   * @param memento the memento objected passed down by the driver.
+   * @return null
+   * @throws java.io.IOException
+   */
+  @Override
+  public byte[] call(final byte[] memento) throws IOException {
+    try (final DataOutputStream outputStream = outputStreamProvider.create("hello")) {
+      outputStream.writeBytes("Hello REEF!");
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/package-info.java
new file mode 100644
index 0000000..f9c7371
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Example application using the output service.
+ * Three evaluators are allocated, and three tasks running on them
+ * write outputs using the output service.
+ */
+package org.apache.reef.examples.data.output;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputService.java
new file mode 100644
index 0000000..c81690d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputService.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.output;
+
+import org.apache.reef.tang.Configuration;
+
+/**
+ * All output services should implement this interface.
+ */
+public interface OutputService {
+
+  /**
+   * Provides a service configuration for the output service.
+   *
+   * @return service configuration.
+   */
+  Configuration getServiceConfiguration();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputStreamProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputStreamProvider.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputStreamProvider.java
new file mode 100644
index 0000000..62b3283
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputStreamProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.output;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * A provider through which users create output streams.
+ */
+public interface OutputStreamProvider {
+
+  /**
+   * create an output stream using the given name.
+   *
+   * @param name name of the created output stream
+   *             It is used as the name of the directory if the created output stream is a file output stream
+   * @return created output stream
+   * @throws java.io.IOException
+   */
+  DataOutputStream create(final String name) throws IOException;
+
+  /**
+   * release resources.
+   *
+   * @throws java.io.IOException
+   */
+  void close() throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputService.java
new file mode 100644
index 0000000..6388f77
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputService.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.output;
+
+import org.apache.reef.driver.context.ServiceConfiguration;
+import org.apache.reef.evaluator.context.events.ContextStop;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.task.events.TaskStart;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A service class of the task output service.
+ * The file output service provides an output stream,
+ * through which tasks write their output to a file
+ * without considering the current runtime
+ * and collision with other tasks.
+ */
+@Unit
+public final class TaskOutputService implements OutputService {
+  private static final Logger LOG = Logger.getLogger(TaskOutputService.class.getName());
+
+  /**
+   * Output stream provider object through which tasks create output streams.
+   */
+  private final TaskOutputStreamProvider taskOutputStreamProvider;
+
+  /**
+   * Path of the directory where output files are created.
+   */
+  private final String outputPath;
+
+  /**
+   * Service constructor - instantiated via TANG.
+   *
+   * @param taskOutputStreamProvider Output stream provider object through which tasks create file output streams.
+   * @param outputPath Path of the directory where output files are created.
+   */
+  @Inject
+  private TaskOutputService(
+      final TaskOutputStreamProvider taskOutputStreamProvider,
+      @Parameter(OutputPath.class) final String outputPath) {
+    this.taskOutputStreamProvider = taskOutputStreamProvider;
+    this.outputPath = outputPath;
+  }
+
+  /**
+   * Provides a service configuration for the output service.
+   *
+   * @return service configuration.
+   */
+  @Override
+  public Configuration getServiceConfiguration() {
+
+    final Configuration partialServiceConf = ServiceConfiguration.CONF
+        .set(ServiceConfiguration.SERVICES, taskOutputStreamProvider.getClass())
+        .set(ServiceConfiguration.ON_CONTEXT_STOP, ContextStopHandler.class)
+        .set(ServiceConfiguration.ON_TASK_STARTED, TaskStartHandler.class)
+        .build();
+
+    return Tang.Factory.getTang()
+        .newConfigurationBuilder(partialServiceConf)
+        .bindImplementation(OutputStreamProvider.class, taskOutputStreamProvider.getClass())
+        .bindImplementation(TaskOutputStreamProvider.class, taskOutputStreamProvider.getClass())
+        .bindNamedParameter(OutputPath.class, outputPath)
+        .build();
+  }
+
+  /**
+   * Handles the ContextStop event: Close the output stream provider.
+   */
+  private final class ContextStopHandler implements EventHandler<ContextStop> {
+    @Override
+    public void onNext(final ContextStop contextStop) {
+      LOG.log(Level.INFO, "Context stopped, close the OutputStreamProvider.");
+      try {
+        taskOutputStreamProvider.close();
+      } catch (final IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * Handles the TaskStart event: Set the task id to the output stream provider.
+   */
+  private final class TaskStartHandler implements EventHandler<TaskStart> {
+    @Override
+    public void onNext(final TaskStart taskStart) {
+      LOG.log(Level.INFO, String.format("Task %s started, create the OutputStreamProvider.", taskStart.getId()));
+      taskOutputStreamProvider.setTaskId(taskStart.getId());
+    }
+  }
+
+  /**
+   * Path of the directory where output files are created.
+   */
+  @NamedParameter(doc = "Path of the directory where output files are created")
+  public static final class OutputPath implements Name<String> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputServiceBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputServiceBuilder.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputServiceBuilder.java
new file mode 100644
index 0000000..d423e5b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputServiceBuilder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.output;
+
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.RequiredImpl;
+import org.apache.reef.tang.formats.RequiredParameter;
+
+/**
+ * Builder to create a TaskOutputService object.
+ */
+public final class TaskOutputServiceBuilder extends ConfigurationModuleBuilder {
+
+  /**
+   * A provider through which users create task output streams.
+   */
+  public static final RequiredImpl<TaskOutputStreamProvider> TASK_OUTPUT_STREAM_PROVIDER = new RequiredImpl<>();
+
+  /**
+   * Path of the directory where output files are created.
+   */
+  public static final RequiredParameter<String> OUTPUT_PATH = new RequiredParameter<>();
+
+  public static final ConfigurationModule CONF = new TaskOutputServiceBuilder()
+      .bindImplementation(OutputService.class, TaskOutputService.class)
+      .bindImplementation(TaskOutputStreamProvider.class, TASK_OUTPUT_STREAM_PROVIDER)
+      .bindNamedParameter(TaskOutputService.OutputPath.class, OUTPUT_PATH)
+      .build();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProvider.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProvider.java
new file mode 100644
index 0000000..f3b9d0b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProvider.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.output;
+
+/**
+ * A provider through which users create task output streams.
+ */
+public abstract class TaskOutputStreamProvider implements OutputStreamProvider {
+
+  /**
+   * id of the current task.
+   */
+  private String taskId;
+
+  /**
+   * set the id of the current task.
+   *
+   * @param taskId id of the current task
+   */
+  protected final void setTaskId(final String taskId) {
+    this.taskId = taskId;
+  }
+
+  /**
+   * get the id of the current task.
+   *
+   * @return id of the current task
+   */
+  protected final String getTaskId() {
+    return this.taskId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderHDFS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderHDFS.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderHDFS.java
new file mode 100644
index 0000000..e322084
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderHDFS.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.output;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Implementation of {@link TaskOutputStreamProvider}.
+ * It provides FileOutputStreams on HDFS.
+ */
+@TaskSide
+public final class TaskOutputStreamProviderHDFS extends TaskOutputStreamProvider {
+
+  /**
+   * Path of the output directory on HDFS to write outputs.
+   */
+  private final String outputPath;
+
+  /**
+   * HDFS File system.
+   */
+  private FileSystem fs;
+
+  /**
+   * Constructor - instantiated via TANG.
+   *
+   * @param outputPath path of the output directory on HDFS to write outputs.
+   */
+  @Inject
+  private TaskOutputStreamProviderHDFS(
+      @Parameter(TaskOutputService.OutputPath.class) final String outputPath) throws IOException {
+    this.outputPath = outputPath;
+    final JobConf jobConf = new JobConf();
+    fs = FileSystem.get(jobConf);
+  }
+
+  /**
+   * create a file output stream using the given name.
+   * The path of the file on HDFS is 'outputPath/name/taskId'.
+   *
+   * @param name name of the created output stream
+   *             It is used as the name of the directory if the created output stream is a file output stream
+   * @return OutputStream to a file on HDFS. The path of the file is 'outputPath/name/taskId'
+   * @throws java.io.IOException
+   */
+  @Override
+  public DataOutputStream create(final String name) throws IOException {
+    final String directoryPath = outputPath + Path.SEPARATOR + name;
+    if (!fs.exists(new Path(directoryPath))) {
+      fs.mkdirs(new Path(directoryPath));
+    }
+    return fs.create(new Path(directoryPath + Path.SEPARATOR + getTaskId()));
+  }
+
+  @Override
+  public void close() throws IOException {
+    fs.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderLocal.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderLocal.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderLocal.java
new file mode 100644
index 0000000..f3afb89
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderLocal.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.output;
+
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * Implementation of {@link TaskOutputStreamProvider}.
+ * It provides FileOutputStreams on the local file system.
+ */
+public final class TaskOutputStreamProviderLocal extends TaskOutputStreamProvider {
+
+  /**
+   * Path of the output directory on the local disk to write outputs.
+   */
+  private final String outputPath;
+
+  /**
+   * Constructor - instantiated via TANG.
+   *
+   * @param outputPath path of the output directory on the local disk to write outputs.
+   */
+  @Inject
+  private TaskOutputStreamProviderLocal(
+      @Parameter(TaskOutputService.OutputPath.class) final String outputPath) {
+    this.outputPath = outputPath;
+  }
+
+  /**
+   * create a file output stream using the given name.
+   * The path of the file on the local file system is 'outputPath/name/taskId'.
+   *
+   * @param name name of the created output stream
+   *             It is used as the name of the directory if the created output stream is a file output stream
+   * @return OutputStream to a file on local file system. The path of the file is 'outputPath/name/taskId'
+   * @throws java.io.IOException
+   */
+  @Override
+  public DataOutputStream create(final String name) throws IOException {
+    final String directoryPath = outputPath + File.separator + name;
+    final File directory = new File(directoryPath);
+
+    synchronized (TaskOutputStreamProviderLocal.class) {
+      if (!directory.exists()) {
+        directory.mkdirs();
+      }
+    }
+
+    final File file = new File(directoryPath + File.separator + getTaskId());
+    return new DataOutputStream(new FileOutputStream(file));
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/package-info.java
new file mode 100644
index 0000000..d6711db
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Data Output Service.
+ * The output service provides tasks with common interface
+ * through which tasks can write outputs without
+ * considering the current runtime and collision with other tasks.
+ */
+package org.apache.reef.io.data.output;