You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/07/02 21:48:49 UTC
incubator-reef git commit: [REEF-393]: OutputService for writing
files to filesystem
Repository: incubator-reef
Updated Branches:
refs/heads/master 26156627e -> 15dd78698
[REEF-393]: OutputService for writing files to filesystem
This addressed the issue by
* implementing OutputService.
* implementing an example application using OutputService.
JIRA:
[REEF-393](https://issues.apache.org/jira/browse/REEF-393)
Pull Request:
This closes #242
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/15dd7869
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/15dd7869
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/15dd7869
Branch: refs/heads/master
Commit: 15dd786988d9cfb5d456a0c67fb3d92ab9e02661
Parents: 2615662
Author: Kijung Shin <ki...@gmail.com>
Authored: Thu Jun 25 17:02:51 2015 +0900
Committer: Markus Weimer <we...@apache.org>
Committed: Thu Jul 2 12:45:50 2015 -0700
----------------------------------------------------------------------
.../data/output/OutputServiceDriver.java | 122 +++++++++++++
.../examples/data/output/OutputServiceREEF.java | 171 +++++++++++++++++++
.../examples/data/output/OutputServiceTask.java | 64 +++++++
.../reef/examples/data/output/package-info.java | 24 +++
.../reef/io/data/output/OutputService.java | 34 ++++
.../io/data/output/OutputStreamProvider.java | 45 +++++
.../reef/io/data/output/TaskOutputService.java | 126 ++++++++++++++
.../data/output/TaskOutputServiceBuilder.java | 46 +++++
.../data/output/TaskOutputStreamProvider.java | 48 ++++++
.../output/TaskOutputStreamProviderHDFS.java | 83 +++++++++
.../output/TaskOutputStreamProviderLocal.java | 78 +++++++++
.../reef/io/data/output/package-info.java | 26 +++
12 files changed, 867 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceDriver.java
new file mode 100644
index 0000000..50fb86c
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceDriver.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.data.output;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.io.data.output.OutputService;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Driver code for the output service demo app.
+ */
+@Unit
+public final class OutputServiceDriver {
+ private static final Logger LOG = Logger.getLogger(OutputServiceDriver.class.getName());
+
+ /**
+ * Evaluator requestor object used to create new evaluator containers.
+ */
+ private final EvaluatorRequestor requestor;
+
+ /**
+ * Output service object.
+ */
+ private final OutputService outputService;
+
+ /**
+ * Sub-id for Tasks.
+ * This object grants different IDs to each task
+ * e.g. Task-0, Task-1, and so on.
+ */
+ private final AtomicInteger taskId = new AtomicInteger(0);
+
+ /**
+ * Job driver constructor - instantiated via TANG.
+ *
+ * @param requestor evaluator requestor object used to create new evaluator containers.
+ * @param outputService output service object.
+ */
+ @Inject
+ public OutputServiceDriver(final EvaluatorRequestor requestor,
+ final OutputService outputService) {
+ LOG.log(Level.FINE, "Instantiated 'OutputServiceDriver'");
+ this.requestor = requestor;
+ this.outputService = outputService;
+ }
+
+ /**
+ * Handles the StartTime event: Request three Evaluators.
+ */
+ public final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ OutputServiceDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
+ .setNumber(3)
+ .setMemory(64)
+ .setNumberOfCores(1)
+ .build());
+ LOG.log(Level.INFO, "Requested Evaluator.");
+ }
+ }
+
+ /**
+ * Handles AllocatedEvaluator: Submit the output service and a context for it.
+ */
+ public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+ LOG.log(Level.INFO, "Submitting Output Service to AllocatedEvaluator: {0}", allocatedEvaluator);
+ final Configuration contextConfiguration = ContextConfiguration.CONF
+ .set(ContextConfiguration.IDENTIFIER, "OutputServiceContext")
+ .build();
+ allocatedEvaluator.submitContextAndService(
+ contextConfiguration, outputService.getServiceConfiguration());
+ }
+ }
+
+ /**
+ * Handles ActiveContext: Submit the output service demo task.
+ */
+ public final class ActiveContextHandler implements EventHandler<ActiveContext> {
+ @Override
+ public void onNext(final ActiveContext activeContext) {
+ LOG.log(Level.INFO,
+ "Submitting OutputServiceREEF task to AllocatedEvaluator: {0}",
+ activeContext.getEvaluatorDescriptor());
+ final Configuration taskConfiguration = TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, "Task-" + taskId.getAndIncrement())
+ .set(TaskConfiguration.TASK, OutputServiceTask.class)
+ .build();
+ activeContext.submitTask(taskConfiguration);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceREEF.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceREEF.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceREEF.java
new file mode 100644
index 0000000..139a8d2
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceREEF.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.data.output;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.io.data.output.*;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Client for the output service demo app.
+ */
+@ClientSide
+public final class OutputServiceREEF {
+ private static final Logger LOG = Logger.getLogger(OutputServiceREEF.class.getName());
+
+ public static void main(final String[] args)
+ throws InjectionException, BindException, IOException {
+
+ final Tang tang = Tang.Factory.getTang();
+ final JavaConfigurationBuilder cb = tang.newConfigurationBuilder();
+ new CommandLine(cb)
+ .registerShortNameOfClass(Local.class)
+ .registerShortNameOfClass(TimeOut.class)
+ .registerShortNameOfClass(OutputDir.class)
+ .processCommandLine(args);
+
+ final Injector injector = tang.newInjector(cb.build());
+ final boolean isLocal = injector.getNamedInstance(Local.class);
+ final String outputDir = injector.getNamedInstance(OutputDir.class);
+ final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000;
+
+ final Configuration driverConf = getDriverConf();
+ final Configuration outputServiceConf = getOutputServiceConf(isLocal, outputDir);
+ final Configuration submittedConfiguration = Tang.Factory.getTang()
+ .newConfigurationBuilder(driverConf, outputServiceConf)
+ .build();
+ final LauncherStatus state = DriverLauncher.getLauncher(getRuntimeConf(isLocal))
+ .run(submittedConfiguration, jobTimeout);
+
+ LOG.log(Level.INFO, "REEF job completed: {0}", state);
+ }
+
+ /**
+ * @param isLocal true for local runtime, or false for YARN runtime.
+ * @return The runtime configuration
+ */
+ private static Configuration getRuntimeConf(final boolean isLocal) {
+ final Configuration runtimeConf;
+ if (isLocal) {
+ LOG.log(Level.INFO, "Running the output service demo on the local runtime");
+ runtimeConf = LocalRuntimeConfiguration.CONF
+ .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, 3)
+ .build();
+ } else {
+ LOG.log(Level.INFO, "Running the output service demo on YARN");
+ runtimeConf = YarnClientConfiguration.CONF.build();
+ }
+ return runtimeConf;
+ }
+
+ /**
+ * @return The Driver configuration.
+ */
+ private static Configuration getDriverConf() {
+ final Configuration driverConf = DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(OutputServiceDriver.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "OutputServiceREEF")
+ .set(DriverConfiguration.ON_DRIVER_STARTED, OutputServiceDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, OutputServiceDriver.EvaluatorAllocatedHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_ACTIVE, OutputServiceDriver.ActiveContextHandler.class)
+ .build();
+
+ return driverConf;
+ }
+
+ /**
+ * @param isLocal true for local runtime, or false for YARN runtime.
+ * @param outputDir path of the output directory.
+ * @return The configuration to use OutputService
+ */
+ private static Configuration getOutputServiceConf(final boolean isLocal, final String outputDir) {
+ final Configuration outputServiceConf;
+ if (isLocal) {
+ outputServiceConf = TaskOutputServiceBuilder.CONF
+ .set(TaskOutputServiceBuilder.TASK_OUTPUT_STREAM_PROVIDER, TaskOutputStreamProviderLocal.class)
+ .set(TaskOutputServiceBuilder.OUTPUT_PATH, getAbsolutePath(outputDir))
+ .build();
+ } else {
+ outputServiceConf = TaskOutputServiceBuilder.CONF
+ .set(TaskOutputServiceBuilder.TASK_OUTPUT_STREAM_PROVIDER, TaskOutputStreamProviderHDFS.class)
+ .set(TaskOutputServiceBuilder.OUTPUT_PATH, outputDir)
+ .build();
+ }
+ return outputServiceConf;
+ }
+
+ /**
+ * transform the given relative path into the absolute path based on the current directory where a user runs the demo.
+ * @param relativePath relative path
+ * @return absolute path
+ */
+ private static String getAbsolutePath(final String relativePath) {
+ final File outputFile = new File(relativePath);
+ return outputFile.getAbsolutePath();
+ }
+
+ /**
+ * Command line parameter = true to run locally, or false to run on YARN.
+ */
+ @NamedParameter(doc = "Whether or not to run on the local runtime",
+ short_name = "local", default_value = "true")
+ public static final class Local implements Name<Boolean> {
+ }
+
+ /**
+ * Command line parameter = number of minutes before timeout.
+ */
+ @NamedParameter(doc = "Number of minutes before timeout",
+ short_name = "timeout", default_value = "2")
+ public static final class TimeOut implements Name<Integer> {
+ }
+
+ /**
+ * Command line parameter = path of the output directory.
+ */
+ @NamedParameter(doc = "Path of the output directory",
+ short_name = "output")
+ public static final class OutputDir implements Name<String> {
+ }
+
+ /**
+ * Empty private constructor to prohibit instantiation of utility class.
+ */
+ private OutputServiceREEF() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceTask.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceTask.java
new file mode 100644
index 0000000..b9ae0ea
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceTask.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.data.output;
+
+import org.apache.reef.io.data.output.OutputStreamProvider;
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * The Task code for the output service demo app.
+ * This task receives an output stream from the output service
+ * and writes "Hello REEF!" on it.
+ */
+public final class OutputServiceTask implements Task {
+
+ /**
+ * Output stream provider object through which tasks create output streams.
+ */
+ private final OutputStreamProvider outputStreamProvider;
+
+ /**
+ * Task constructor - instantiated via TANG.
+ *
+ * @param outputStreamProvider Output stream provider object through which tasks create output streams.
+ */
+ @Inject
+ public OutputServiceTask(final OutputStreamProvider outputStreamProvider) {
+ this.outputStreamProvider = outputStreamProvider;
+ }
+
+ /**
+ * Receives an output stream from the output service and writes "Hello REEF!" on it.
+ *
+ * @param memento the memento objected passed down by the driver.
+ * @return null
+ * @throws java.io.IOException
+ */
+ @Override
+ public byte[] call(final byte[] memento) throws IOException {
+ try (final DataOutputStream outputStream = outputStreamProvider.create("hello")) {
+ outputStream.writeBytes("Hello REEF!");
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/package-info.java
new file mode 100644
index 0000000..f9c7371
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Example application using the output service.
+ * Three evaluators are allocated, and three tasks running on them
+ * write outputs using the output service.
+ */
+package org.apache.reef.examples.data.output;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputService.java
new file mode 100644
index 0000000..c81690d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputService.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.output;
+
+import org.apache.reef.tang.Configuration;
+
+/**
+ * All output services should implement this interface.
+ */
+public interface OutputService {
+
+ /**
+ * Provides a service configuration for the output service.
+ *
+ * @return service configuration.
+ */
+ Configuration getServiceConfiguration();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputStreamProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputStreamProvider.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputStreamProvider.java
new file mode 100644
index 0000000..62b3283
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputStreamProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.output;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * A provider through which users create output streams.
+ */
+public interface OutputStreamProvider {
+
+ /**
+ * create an output stream using the given name.
+ *
+ * @param name name of the created output stream
+ * It is used as the name of the directory if the created output stream is a file output stream
+ * @return created output stream
+ * @throws java.io.IOException
+ */
+ DataOutputStream create(final String name) throws IOException;
+
+ /**
+ * release resources.
+ *
+ * @throws java.io.IOException
+ */
+ void close() throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputService.java
new file mode 100644
index 0000000..6388f77
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputService.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.output;
+
+import org.apache.reef.driver.context.ServiceConfiguration;
+import org.apache.reef.evaluator.context.events.ContextStop;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.task.events.TaskStart;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A service class of the task output service.
+ * The file output service provides an output stream,
+ * through which tasks write their output to a file
+ * without considering the current runtime
+ * and collision with other tasks.
+ */
+@Unit
+public final class TaskOutputService implements OutputService {
+ private static final Logger LOG = Logger.getLogger(TaskOutputService.class.getName());
+
+ /**
+ * Output stream provider object through which tasks create output streams.
+ */
+ private final TaskOutputStreamProvider taskOutputStreamProvider;
+
+ /**
+ * Path of the directory where output files are created.
+ */
+ private final String outputPath;
+
+ /**
+ * Service constructor - instantiated via TANG.
+ *
+ * @param taskOutputStreamProvider Output stream provider object through which tasks create file output streams.
+ * @param outputPath Path of the directory where output files are created.
+ */
+ @Inject
+ private TaskOutputService(
+ final TaskOutputStreamProvider taskOutputStreamProvider,
+ @Parameter(OutputPath.class) final String outputPath) {
+ this.taskOutputStreamProvider = taskOutputStreamProvider;
+ this.outputPath = outputPath;
+ }
+
+ /**
+ * Provides a service configuration for the output service.
+ *
+ * @return service configuration.
+ */
+ @Override
+ public Configuration getServiceConfiguration() {
+
+ final Configuration partialServiceConf = ServiceConfiguration.CONF
+ .set(ServiceConfiguration.SERVICES, taskOutputStreamProvider.getClass())
+ .set(ServiceConfiguration.ON_CONTEXT_STOP, ContextStopHandler.class)
+ .set(ServiceConfiguration.ON_TASK_STARTED, TaskStartHandler.class)
+ .build();
+
+ return Tang.Factory.getTang()
+ .newConfigurationBuilder(partialServiceConf)
+ .bindImplementation(OutputStreamProvider.class, taskOutputStreamProvider.getClass())
+ .bindImplementation(TaskOutputStreamProvider.class, taskOutputStreamProvider.getClass())
+ .bindNamedParameter(OutputPath.class, outputPath)
+ .build();
+ }
+
+ /**
+ * Handles the ContextStop event: Close the output stream provider.
+ */
+ private final class ContextStopHandler implements EventHandler<ContextStop> {
+ @Override
+ public void onNext(final ContextStop contextStop) {
+ LOG.log(Level.INFO, "Context stopped, close the OutputStreamProvider.");
+ try {
+ taskOutputStreamProvider.close();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Handles the TaskStart event: Set the task id to the output stream provider.
+ */
+ private final class TaskStartHandler implements EventHandler<TaskStart> {
+ @Override
+ public void onNext(final TaskStart taskStart) {
+ LOG.log(Level.INFO, String.format("Task %s started, create the OutputStreamProvider.", taskStart.getId()));
+ taskOutputStreamProvider.setTaskId(taskStart.getId());
+ }
+ }
+
+ /**
+ * Path of the directory where output files are created.
+ */
+ @NamedParameter(doc = "Path of the directory where output files are created")
+ public static final class OutputPath implements Name<String> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputServiceBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputServiceBuilder.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputServiceBuilder.java
new file mode 100644
index 0000000..d423e5b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputServiceBuilder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.output;
+
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.RequiredImpl;
+import org.apache.reef.tang.formats.RequiredParameter;
+
+/**
+ * Builder to create a TaskOutputService object.
+ */
+public final class TaskOutputServiceBuilder extends ConfigurationModuleBuilder {
+
+ /**
+ * A provider through which users create task output streams.
+ */
+ public static final RequiredImpl<TaskOutputStreamProvider> TASK_OUTPUT_STREAM_PROVIDER = new RequiredImpl<>();
+
+ /**
+ * Path of the directory where output files are created.
+ */
+ public static final RequiredParameter<String> OUTPUT_PATH = new RequiredParameter<>();
+
+ public static final ConfigurationModule CONF = new TaskOutputServiceBuilder()
+ .bindImplementation(OutputService.class, TaskOutputService.class)
+ .bindImplementation(TaskOutputStreamProvider.class, TASK_OUTPUT_STREAM_PROVIDER)
+ .bindNamedParameter(TaskOutputService.OutputPath.class, OUTPUT_PATH)
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProvider.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProvider.java
new file mode 100644
index 0000000..f3b9d0b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProvider.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.output;
+
+/**
+ * A provider through which users create task output streams.
+ */
+public abstract class TaskOutputStreamProvider implements OutputStreamProvider {
+
+ /**
+ * id of the current task.
+ */
+ private String taskId;
+
+ /**
+ * set the id of the current task.
+ *
+ * @param taskId id of the current task
+ */
+ protected final void setTaskId(final String taskId) {
+ this.taskId = taskId;
+ }
+
+ /**
+ * get the id of the current task.
+ *
+ * @return id of the current task
+ */
+ protected final String getTaskId() {
+ return this.taskId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderHDFS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderHDFS.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderHDFS.java
new file mode 100644
index 0000000..e322084
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderHDFS.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.output;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Implementation of {@link TaskOutputStreamProvider}.
+ * It provides FileOutputStreams on HDFS.
+ */
+@TaskSide
+public final class TaskOutputStreamProviderHDFS extends TaskOutputStreamProvider {
+
+ /**
+ * Path of the output directory on HDFS to write outputs.
+ */
+ private final String outputPath;
+
+ /**
+ * HDFS File system.
+ */
+ private FileSystem fs;
+
+ /**
+ * Constructor - instantiated via TANG.
+ *
+ * @param outputPath path of the output directory on HDFS to write outputs.
+ */
+ @Inject
+ private TaskOutputStreamProviderHDFS(
+ @Parameter(TaskOutputService.OutputPath.class) final String outputPath) throws IOException {
+ this.outputPath = outputPath;
+ final JobConf jobConf = new JobConf();
+ fs = FileSystem.get(jobConf);
+ }
+
+ /**
+ * create a file output stream using the given name.
+ * The path of the file on HDFS is 'outputPath/name/taskId'.
+ *
+ * @param name name of the created output stream
+ * It is used as the name of the directory if the created output stream is a file output stream
+ * @return OutputStream to a file on HDFS. The path of the file is 'outputPath/name/taskId'
+ * @throws java.io.IOException
+ */
+ @Override
+ public DataOutputStream create(final String name) throws IOException {
+ final String directoryPath = outputPath + Path.SEPARATOR + name;
+ if (!fs.exists(new Path(directoryPath))) {
+ fs.mkdirs(new Path(directoryPath));
+ }
+ return fs.create(new Path(directoryPath + Path.SEPARATOR + getTaskId()));
+ }
+
+ @Override
+ public void close() throws IOException {
+ fs.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderLocal.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderLocal.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderLocal.java
new file mode 100644
index 0000000..f3afb89
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderLocal.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.output;
+
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * Implementation of {@link TaskOutputStreamProvider}.
+ * It provides FileOutputStreams on the local file system.
+ */
+public final class TaskOutputStreamProviderLocal extends TaskOutputStreamProvider {
+
+ /**
+ * Path of the output directory on the local disk to write outputs.
+ */
+ private final String outputPath;
+
+ /**
+ * Constructor - instantiated via TANG.
+ *
+ * @param outputPath path of the output directory on the local disk to write outputs.
+ */
+ @Inject
+ private TaskOutputStreamProviderLocal(
+ @Parameter(TaskOutputService.OutputPath.class) final String outputPath) {
+ this.outputPath = outputPath;
+ }
+
+ /**
+ * create a file output stream using the given name.
+ * The path of the file on the local file system is 'outputPath/name/taskId'.
+ *
+ * @param name name of the created output stream
+ * It is used as the name of the directory if the created output stream is a file output stream
+ * @return OutputStream to a file on local file system. The path of the file is 'outputPath/name/taskId'
+ * @throws java.io.IOException
+ */
+ @Override
+ public DataOutputStream create(final String name) throws IOException {
+ final String directoryPath = outputPath + File.separator + name;
+ final File directory = new File(directoryPath);
+
+ synchronized (TaskOutputStreamProviderLocal.class) {
+ if (!directory.exists()) {
+ directory.mkdirs();
+ }
+ }
+
+ final File file = new File(directoryPath + File.separator + getTaskId());
+ return new DataOutputStream(new FileOutputStream(file));
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/package-info.java
new file mode 100644
index 0000000..d6711db
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Data Output Service.
+ * The output service provides tasks with common interface
+ * through which tasks can write outputs without
+ * considering the current runtime and collision with other tasks.
+ */
+package org.apache.reef.io.data.output;