You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/03/27 20:01:46 UTC
svn commit: r641943 - in /incubator/pig/trunk/src/org/apache/pig:
backend/hadoop/streaming/ impl/streaming/
Author: olga
Date: Thu Mar 27 12:01:32 2008
New Revision: 641943
URL: http://svn.apache.org/viewvc?rev=641943&view=rev
Log:
Files that I forgot to svn add as part of commit for PIG-94 patch
Added:
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java
incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java
incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java
incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java
incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java
incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java
incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java
Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link HadoopExecutableManager} is a specialization of
+ * {@link ExecutableManager} and provides HDFS-specific support for secondary
+ * outputs, task-logs etc.
+ *
+ * <code>HadoopExecutableManager</code> provides support for secondary outputs
+ * of the managed process and also persists the logs of the tasks on HDFS.
+ */
+public class HadoopExecutableManager extends ExecutableManager {
+
+ JobConf job;
+
+ String scriptOutputDir;
+ String scriptLogDir;
+ String taskId;
+
+ FSDataOutputStream errorStream;
+
+ boolean writeHeaderFooter = false;
+
+ public HadoopExecutableManager() {}
+
+ public void configure(Properties properties, StreamingCommand command,
+ DataCollector endOfPipe)
+ throws IOException, ExecException {
+ super.configure(properties, command, endOfPipe);
+
+ // Chmod +x the executable
+ File executable = new File(command.getExecutable());
+ if (executable.isAbsolute()) {
+ // we don't own it. Hope it is executable ...
+ } else {
+ try {
+ FileUtil.chmod(executable.toString(), "a+x");
+ } catch (InterruptedException ie) {
+ throw new ExecException(ie);
+ }
+ }
+
+ // Save the output directory for the Pig Script
+ scriptOutputDir = properties.getProperty("pig.streaming.task.output.dir");
+ scriptLogDir = properties.getProperty("pig.streaming.log.dir");
+
+ // Save the taskid
+ taskId = properties.getProperty("pig.streaming.task.id");
+
+ // Save a copy of the JobConf
+ job = PigMapReduce.getPigContext().getJobConf();
+ }
+
+ protected void exec() throws IOException {
+ // Create the HDFS file for the stderr of the task, if necessary
+ if (writeErrorToHDFS(command.getLogFilesLimit(), taskId)) {
+ try {
+ Path errorFile =
+ new Path(new Path(scriptLogDir, command.getLogDir()), taskId);
+ errorStream =
+ errorFile.getFileSystem(job).create(errorFile);
+ } catch (IOException ie) {
+ // Don't fail the task if we couldn't save it's stderr on HDFS
+ System.err.println("Failed to create stderr file of task: " +
+ taskId + " in HDFS at " + scriptLogDir +
+ " with " + ie);
+ errorStream = null;
+ }
+ }
+
+ // Header for stderr file of the task
+ writeDebugHeader();
+
+ // Exec the command ...
+ super.exec();
+ }
+
+ public void close() throws IOException, ExecException {
+ super.close();
+
+ // Footer for stderr file of the task
+ writeDebugFooter();
+
+ // Copy the secondary outputs of the task to HDFS
+ Path scriptOutputDir = new Path(this.scriptOutputDir);
+ FileSystem fs = scriptOutputDir.getFileSystem(job);
+ List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
+ if (outputSpecs != null) {
+ for (int i=1; i < outputSpecs.size(); ++i) {
+ String fileName = outputSpecs.get(i).getName();
+ try {
+ fs.copyFromLocalFile(false, true, new Path(fileName),
+ new Path(scriptOutputDir,
+ taskId+"-"+fileName)
+ );
+ } catch (IOException ioe) {
+ System.err.println("Failed to save secondary output '" +
+ fileName + "' of task: " + taskId +
+ " with " + ioe);
+ throw new ExecException(ioe);
+ }
+ }
+ }
+
+ // Close the stderr file on HDFS
+ if (errorStream != null) {
+ errorStream.close();
+ }
+ }
+
+ /**
+ * Should the stderr data of this task be persisted on HDFS?
+ *
+ * @param limit maximum number of tasks whose stderr log-files are persisted
+ * @param taskId id of the task
+ * @return <code>true</code> if stderr data of task should be persisted on
+ * HDFS, <code>false</code> otherwise
+ */
+ private boolean writeErrorToHDFS(int limit, String taskId) {
+ // These are hard-coded begin/end offsets a Hadoop *taskid*
+ int beginIndex = 25, endIndex = 31;
+
+ int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
+ return command.getPersistStderr() && tipId < command.getLogFilesLimit();
+ }
+
+ protected void processError(String error) {
+ super.processError(error);
+
+ try {
+ if (errorStream != null) {
+ errorStream.writeBytes(error);
+ }
+ } catch (IOException ioe) {
+ super.processError("Failed to save error logs to HDFS with: " +
+ ioe);
+ }
+ }
+
+ private void writeDebugHeader() {
+ processError("===== Task Information Header =====" );
+
+ processError("\nCommand: " + command.getExecutable());
+ processError("\nStart time: " + new Date(System.currentTimeMillis()));
+ processError("\nInput-split file: " + job.get("map.input.file"));
+ processError("\nInput-split start-offset: " +
+ job.getLong("map.input.start", -1));
+ processError("\nInput-split length: " +
+ job.getLong("map.input.length", -1));
+
+ processError("\n===== * * * =====\n");
+ }
+
+ private void writeDebugFooter() {
+ processError("===== Task Information Footer =====");
+
+ processError("\nEnd time: " + new Date(System.currentTimeMillis()));
+ processError("\nExit code: " + exitCode);
+
+ List<HandleSpec> inputSpecs = command.getHandleSpecs(Handle.INPUT);
+ HandleSpec inputSpec =
+ (inputSpecs != null) ? inputSpecs.get(0) : null;
+ processError("\nInput bytes: " + inputBytes + " bytes " +
+ ((inputSpec != null) ?
+ "(" + inputSpec.getName() + " using " +
+ inputSpec.getSpec() + ")"
+ : ""));
+
+ List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
+ HandleSpec outputSpec =
+ (outputSpecs != null) ? outputSpecs.get(0) : null;
+ processError("\nOutput bytes: " + outputBytes + " bytes " +
+ ((outputSpec != null) ?
+ "(" + outputSpec.getName() + " using " +
+ outputSpec.getSpec() + ")"
+ : ""));
+ if (outputSpecs != null) {
+ for (int i=1; i < outputSpecs.size(); ++i) {
+ HandleSpec spec = outputSpecs.get(i);
+ processError("\n " + new File(spec.getName()).length()
+ + " bytes using " + spec.getSpec());
+ }
+ }
+
+ processError("\n===== * * * =====\n");
+ }
+}
+
+
\ No newline at end of file
Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link DefaultInputHandler} handles the input for the Pig-Streaming
+ * executable in a {@link InputType#SYNCHRONOUS} manner by feeding it input
+ * via its <code>stdin</code>.
+ */
+public class DefaultInputHandler extends InputHandler {
+
+ OutputStream stdin;
+
+ public DefaultInputHandler() {
+ serializer = new PigStorage();
+ }
+
+ public DefaultInputHandler(HandleSpec spec) {
+ serializer = (StoreFunc)PigContext.instantiateFuncFromSpec(spec.spec);
+ }
+
+ public InputType getInputType() {
+ return InputType.SYNCHRONOUS;
+ }
+
+ public void bindTo(OutputStream os) throws IOException {
+ stdin = os;
+ super.bindTo(stdin);
+ }
+
+ public void close() throws IOException {
+ super.close();
+ stdin.flush();
+ stdin.close();
+ stdin = null;
+ }
+}
Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link FileOutputHandler} handles the output from the Pig-Streaming
+ * executable in an {@link OutputType#SYNCHRONOUS} manner by reading its output
+ * via its <code>stdout</code>.
+ */
+public class DefaultOutputHandler extends OutputHandler {
+
+ public DefaultOutputHandler() {
+ deserializer = new PigStorage();
+ }
+
+ public DefaultOutputHandler(HandleSpec spec) {
+ deserializer = (LoadFunc)PigContext.instantiateFuncFromSpec(spec.spec);
+ }
+
+ public OutputType getOutputType() {
+ return OutputType.SYNCHRONOUS;
+ }
+}
Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.streaming.InputHandler.InputType;
+import org.apache.pig.impl.streaming.OutputHandler.OutputType;
+
+/**
+ * {@link ExecutableManager} manages an external executable which processes data
+ * in a Pig query.
+ *
+ * The <code>ExecutableManager</code> is responsible for startup/teardown of the
+ * external process and also for managing it.
+ * It feeds input records to the executable via it's <code>stdin</code>,
+ * collects the output records from the <code>stdout</code> and also diagnostic
+ * information from the <code>stdout</code>.
+ */
+public class ExecutableManager {
+ private static final Log LOG =
+ LogFactory.getLog(ExecutableManager.class.getName());
+ private static final int SUCCESS = 0;
+
+ protected StreamingCommand command; // Streaming command to be run
+ String[] argv; // Parsed/split commands
+
+ Process process; // Handle to the process
+ protected int exitCode = -127; // Exit code of the process
+
+ protected DataOutputStream stdin; // stdin of the process
+
+ ProcessOutputThread stdoutThread; // thread to get process output
+ InputStream stdout; // stdout of the process
+ // interpret the process' output
+
+ ProcessErrorThread stderrThread; // thread to get process output
+ InputStream stderr; // stderr of the process
+
+ DataCollector endOfPipe;
+
+ // Input/Output handlers
+ InputHandler inputHandler;
+ OutputHandler outputHandler;
+
+ Properties properties;
+
+ protected long inputBytes = 0;
+ protected long outputBytes = 0;
+
+ public ExecutableManager() {}
+
+ public void configure(Properties properties, StreamingCommand command,
+ DataCollector endOfPipe)
+ throws IOException, ExecException {
+ this.properties = properties;
+
+ this.command = command;
+ this.argv = this.command.getCommandArgs();
+
+ // Create the input/output handlers
+ this.inputHandler = HandlerFactory.createInputHandler(command);
+ this.outputHandler =
+ HandlerFactory.createOutputHandler(command);
+
+ // Successor
+ this.endOfPipe = endOfPipe;
+ }
+
+ public void close() throws IOException, ExecException {
+ // Close the InputHandler, which in some cases lets the process
+ // terminate
+ inputHandler.close();
+
+ // Check if we need to start the process now ...
+ if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
+ exec();
+ }
+
+ // Wait for the process to exit and the stdout/stderr threads to complete
+ try {
+ exitCode = process.waitFor();
+
+ if (stdoutThread != null) {
+ stdoutThread.join(0);
+ }
+ if (stderrThread != null) {
+ stderrThread.join(0);
+ }
+
+ } catch (InterruptedException ie) {}
+
+ // Clean up the process
+ process.destroy();
+
+ LOG.debug("Process exited with: " + exitCode);
+ if (exitCode != SUCCESS) {
+ throw new ExecException(command + " failed with exit status: " +
+ exitCode);
+ }
+
+ if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
+ // Trigger the outputHandler
+ outputHandler.bindTo(null);
+
+ // Start the thread to process the output and wait for
+ // it to terminate
+ stdoutThread = new ProcessOutputThread(outputHandler);
+ stdoutThread.start();
+
+ try {
+ stdoutThread.join(0);
+ } catch (InterruptedException ie) {}
+ }
+
+ }
+
+ protected void exec() throws IOException {
+ // Unquote command-line arguments ...
+ for (int i=0; i < argv.length; ++i) {
+ String arg = argv[i];
+ if (arg.charAt(0) == '\'' && arg.charAt(arg.length()-1) == '\'') {
+ argv[i] = arg.substring(1, arg.length()-1);
+ }
+ }
+
+ // Start the external process
+ ProcessBuilder processBuilder = new ProcessBuilder(argv);
+ process = processBuilder.start();
+ LOG.debug("Started the process for command: " + command);
+
+ // Pick up the process' stderr stream and start the thread to
+ // process the stderr stream
+ stderr =
+ new DataInputStream(new BufferedInputStream(process.getErrorStream()));
+ stderrThread = new ProcessErrorThread();
+ stderrThread.start();
+
+ // Check if we need to handle the process' stdout directly
+ if (outputHandler.getOutputType() == OutputType.SYNCHRONOUS) {
+ // Get hold of the stdout of the process
+ stdout =
+ new DataInputStream(new BufferedInputStream(process.getInputStream()));
+
+ // Bind the stdout to the OutputHandler
+ outputHandler.bindTo(stdout);
+
+ // Start the thread to process the executable's stdout
+ stdoutThread = new ProcessOutputThread(outputHandler);
+ stdoutThread.start();
+ }
+ }
+
+ public void run() throws IOException {
+ // Check if we need to exec the process NOW ...
+ if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
+ return;
+ }
+
+ // Start the executable ...
+ exec();
+ stdin =
+ new DataOutputStream(new BufferedOutputStream(process.getOutputStream()));
+ inputHandler.bindTo(stdin);
+ }
+
+ public void add(Datum d) throws IOException {
+ // Pass the serialized tuple to the executable via the InputHandler
+ Tuple t = (Tuple)d;
+ inputHandler.putNext(t);
+ inputBytes += t.getMemorySize();
+ }
+
+ /**
+ * Workhorse to process the output of the managed process.
+ *
+ * The <code>ExecutableManager</code>, by default, just pushes the received
+ * <code>Datum</code> into eval-pipeline to be processed by the successor.
+ *
+ * @param d <code>Datum</code> to process
+ */
+ protected void processOutput(Datum d) {
+ endOfPipe.add(d);
+ }
+
+ class ProcessOutputThread extends Thread {
+
+ OutputHandler outputHandler;
+
+ ProcessOutputThread(OutputHandler outputHandler) {
+ setDaemon(true);
+ this.outputHandler = outputHandler;
+ }
+
+ public void run() {
+ try {
+ // Read tuples from the executable and push them down the pipe
+ Tuple tuple = null;
+ while ((tuple = outputHandler.getNext()) != null) {
+ processOutput(tuple);
+ outputBytes += tuple.getMemorySize();
+ }
+
+ outputHandler.close();
+ } catch (Throwable t) {
+ LOG.warn(t);
+ try {
+ outputHandler.close();
+ } catch (IOException ioe) {
+ LOG.info(ioe);
+ }
+ throw new RuntimeException(t);
+ }
+ }
+ }
+
+ /**
+ * Workhorse to process the stderr stream of the managed process.
+ *
+ * By default <code>ExecuatbleManager</code> just sends out the received
+ * error message to the <code>stderr</code> of itself.
+ *
+ * @param error error message from the managed process.
+ */
+ protected void processError(String error) {
+ // Just send it out to our stderr
+ System.err.print(error);
+ }
+
+ class ProcessErrorThread extends Thread {
+
+ public ProcessErrorThread() {
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ String error;
+ BufferedReader reader =
+ new BufferedReader(new InputStreamReader(stderr));
+ while ((error = reader.readLine()) != null) {
+ processError(error+"\n");
+ }
+
+ if (stderr != null) {
+ stderr.close();
+ LOG.debug("ProcessErrorThread done");
+ }
+ } catch (Throwable th) {
+ LOG.warn(th);
+ try {
+ if (stderr != null) {
+ stderr.close();
+ }
+ } catch (IOException ioe) {
+ LOG.info(ioe);
+ throw new RuntimeException(th);
+ }
+ }
+ }
+ }
+}
Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link FileInputHandler} handles the input for the Pig-Streaming
+ * executable in an {@link InputType#ASYNCHRONOUS} manner by feeding it input
+ * via an external file specified by the user.
+ */
+public class FileInputHandler extends InputHandler {
+
+ String fileName;
+ OutputStream fileOutStream;
+
+ public FileInputHandler(HandleSpec handleSpec) throws ExecException {
+ fileName = handleSpec.name;
+ serializer =
+ (StoreFunc) PigContext.instantiateFuncFromSpec(handleSpec.spec);
+
+ try {
+ fileOutStream = new FileOutputStream(new File(fileName));
+ super.bindTo(fileOutStream);
+ } catch (IOException fnfe) {
+ throw new ExecException(fnfe);
+ }
+ }
+
+ public InputType getInputType() {
+ return InputType.ASYNCHRONOUS;
+ }
+
+ public void bindTo(OutputStream os) throws IOException {
+ throw new UnsupportedOperationException("Cannot call bindTo on " +
+ "FileInputHandler");
+ }
+
+ public void close() throws IOException {
+ super.close();
+ fileOutStream.flush();
+ fileOutStream.close();
+ fileOutStream = null;
+ }
+
+}
Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link FileOutputHandler} handles the output from the Pig-Streaming
+ * executable in an {@link OutputType#ASYNCHRONOUS} manner by reading it from
+ * an external file specified by the user.
+ */
+public class FileOutputHandler extends OutputHandler {
+
+ String fileName;
+ InputStream fileInStream;
+
+ public FileOutputHandler(HandleSpec handleSpec) throws ExecException {
+ fileName = handleSpec.name;
+ deserializer =
+ (LoadFunc) PigContext.instantiateFuncFromSpec(handleSpec.spec);
+ }
+
+ public OutputType getOutputType() {
+ return OutputType.ASYNCHRONOUS;
+ }
+
+ public void bindTo(InputStream is) throws IOException {
+ // This is a trigger to start processing the output from the file ...
+ fileInStream = new FileInputStream(new File(fileName));
+ super.bindTo(fileInStream);
+ }
+
+ public void close() throws IOException {
+ super.close();
+ fileInStream.close();
+ fileInStream = null;
+ }
+
+}
Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * Factory to create an {@link InputHandler} or {@link OutputHandler}
+ * depending on the specification of the {@link StreamingCommand}.
+ */
+public class HandlerFactory {
+
+ /**
+ * Create an <code>InputHandler</code> for the given input specification
+ * of the <code>StreamingCommand</code>.
+ *
+ * @param command <code>StreamingCommand</code>
+ * @return <code>InputHandler</code> for the given input specification
+ * @throws ExecException
+ */
+ public static InputHandler createInputHandler(StreamingCommand command)
+ throws ExecException {
+ List<HandleSpec> inputSpecs = command.getHandleSpecs(Handle.INPUT);
+
+ HandleSpec in = null;
+ if (inputSpecs == null || (in = inputSpecs.get(0)) == null) {
+ return new DefaultInputHandler();
+ }
+
+ return (in.name.equals("stdin")) ? new DefaultInputHandler(in) :
+ new FileInputHandler(in);
+ }
+
+ /**
+ * Create an <code>OutputHandler</code> for the given output specification
+ * of the <code>StreamingCommand</code>.
+ *
+ * @param command <code>StreamingCommand</code>
+ * @return <code>OutputHandler</code> for the given output specification
+ * @throws ExecException
+ */
+ public static OutputHandler createOutputHandler(StreamingCommand command)
+ throws ExecException {
+ List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
+
+ HandleSpec out = null;
+ if (outputSpecs == null || (out = outputSpecs.get(0)) == null) {
+ return new DefaultOutputHandler();
+ }
+
+ return (out.name.equals("stdout")) ? new DefaultOutputHandler(out) :
+ new FileOutputHandler(out);
+ }
+}
Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * {@link InputHandler} is responsible for handling the input to the
+ * Pig-Streaming external command.
+ *
+ * The managed executable could be fed input in a {@link InputType#SYNCHRONOUS}
+ * manner via its <code>stdin</code> or in an {@link InputType#ASYNCHRONOUS}
+ * manner via an external file which is subsequently read by the executable.
+ */
+public abstract class InputHandler {
+ /**
+ *
+ */
+ public enum InputType {SYNCHRONOUS, ASYNCHRONOUS}
+
+ /*
+ * The serializer to be used to send data to the managed process.
+ *
+ * It is the responsibility of the concrete sub-classes to setup and
+ * manage the serializer.
+ */
+ protected StoreFunc serializer;
+
+ /**
+ * Get the handled <code>InputType</code>
+ * @return the handled <code>InputType</code>
+ */
+ public abstract InputType getInputType();
+
+ /**
+ * Send the given input <code>Tuple</code> to the managed executable.
+ *
+ * @param t input <code>Tuple</code>
+ * @throws IOException
+ */
+ public void putNext(Tuple t) throws IOException {
+ serializer.putNext(t);
+ }
+
+ /**
+ * Close the <code>InputHandler</code> since there is no more input
+ * to be sent to the managed process.
+ *
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ serializer.finish();
+ }
+
+ /**
+ * Bind the <code>InputHandler</code> to the <code>OutputStream</code>
+ * from which it reads input and sends it to the managed process.
+ *
+ * @param os <code>OutputStream</code> from which to read input data for the
+ * managed process
+ * @throws IOException
+ */
+ public void bindTo(OutputStream os) throws IOException {
+ serializer.bindTo(os);
+ }
+}
Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+
+/**
+ * {@link OutputHandler} is responsible for handling the output of the
+ * Pig-Streaming external command.
+ *
+ * The output of the managed executable could be fetched in a
+ * {@link OutputType#SYNCHRONOUS} manner via its <code>stdout</code> or in an
+ * {@link OutputType#ASYNCHRONOUS} manner via an external file to which the
+ * process wrote its output.
+ */
+public abstract class OutputHandler {
+ public enum OutputType {SYNCHRONOUS, ASYNCHRONOUS}
+
+ /*
+ * The deserializer to be used to send data to the managed process.
+ *
+ * It is the responsibility of the concrete sub-classes to setup and
+ * manage the deserializer.
+ */
+ protected LoadFunc deserializer;
+
+ /**
+ * Get the handled <code>OutputType</code>.
+ * @return the handled <code>OutputType</code>
+ */
+ public abstract OutputType getOutputType();
+
+ /**
+ * Bind the <code>OutputHandler</code> to the <code>InputStream</code>
+ * from which to read the output data of the managed process.
+ *
+ * @param is <code>InputStream</code> from which to read the output data
+ * of the managed process
+ * @throws IOException
+ */
+ public void bindTo(InputStream is) throws IOException {
+ deserializer.bindTo("", new BufferedPositionedInputStream(is), 0,
+ Long.MAX_VALUE);
+ }
+
+ /**
+ * Get the next output <code>Tuple</code> of the managed process.
+ *
+ * @return the next output <code>Tuple</code> of the managed process
+ * @throws IOException
+ */
+ public Tuple getNext() throws IOException {
+ return deserializer.getNext();
+ }
+
+ /**
+ * Close the <code>OutputHandler</code>.
+ * @throws IOException
+ */
+ public void close() throws IOException {}
+}
Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,347 @@
+package org.apache.pig.impl.streaming;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.pig.builtin.PigStorage;
+
+
+/**
+ * {@link StreamingCommand} represents the specification of an external
+ * command to be executed in a Pig Query.
+ *
+ * <code>StreamingCommand</code> encapsulates all relevant details of the
+ * command specified by the user either directly via the <code>STREAM</code>
+ * operator or indirectly via a <code>DEFINE</code> operator. It includes
+ * details such as input/output/error specifications and also files to be
+ * shipped to the cluster and files to be cached.
+ */
+public class StreamingCommand implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ // External command to be executed and it's parsed components
+ String executable;
+ String[] argv;
+
+ // Files to be shipped to the cluster in-order to be executed
+ List<String> shipSpec = new LinkedList<String>();
+
+ // Files to be shipped to the cluster in-order to be executed
+ List<String> cacheSpec = new LinkedList<String>();
+
+ /**
+ * Handle to communicate with the external process.
+ */
+ public enum Handle {INPUT, OUTPUT}
+
+ /**
+ * Map from the the stdin/stdout/stderr handles to their specifications
+ */
+ Map<Handle, List<HandleSpec>> handleSpecs =
+ new TreeMap<Handle, List<HandleSpec>>();
+
+ // Should the stderr of the process be persisted?
+ boolean persistStderr = false;
+
+ // Directory where the process's stderr logs should be persisted.
+ String logDir;
+
+ // Limit on the number of persisted log-files
+ int logFilesLimit = 100;
+ public static final int MAX_TASKS = 100;
+
+ boolean shipFiles = true;
+
+ /**
+ * Create a new <code>StreamingCommand</code> with the given command.
+ *
+ * @param command streaming command to be executed
+ * @param argv parsed arguments of the <code>command</code>
+ */
+ public StreamingCommand(String[] argv) {
+ this.argv = argv;
+
+ // Assume that argv[0] is the executable
+ this.executable = this.argv[0];
+ }
+
+ /**
+ * Get the command to be executed.
+ *
+ * @return the command to be executed
+ */
+ public String getExecutable() {
+ return executable;
+ }
+
+ /**
+ * Set the executable for the <code>StreamingCommand</code>.
+ *
+ * @param executable the executable for the <code>StreamingCommand</code>
+ */
+ public void setExecutable(String executable) {
+ this.executable = executable;
+ }
+
+ /**
+ * Set the command line arguments for the <code>StreamingCommand</code>.
+ *
+ * @param argv the command line arguments for the
+ * <code>StreamingCommand</code>
+ */
+ public void setCommandArgs(String[] argv) {
+ this.argv = argv;
+ }
+
+ /**
+ * Get the parsed command arguments.
+ *
+ * @return the parsed command arguments as <code>String[]</code>
+ */
+ public String[] getCommandArgs() {
+ return argv;
+ }
+
+ /**
+ * Get the list of files which need to be shipped to the cluster.
+ *
+ * @return the list of files which need to be shipped to the cluster
+ */
+ public List<String> getShipSpecs() {
+ return shipSpec;
+ }
+
+ /**
+ * Get the list of files which need to be cached on the execute nodes.
+ *
+ * @return the list of files which need to be cached on the execute nodes
+ */
+ public List<String> getCacheSpecs() {
+ return cacheSpec;
+ }
+
+ /**
+ * Add a file to be shipped to the cluster.
+ *
+ * Users can use this to distribute executables and other necessary files
+ * to the clusters.
+ *
+ * @param path path of the file to be shipped to the cluster
+ */
+ public void addPathToShip(String path) {
+ shipSpec.add(path);
+ }
+
+ /**
+ * Add a file to be cached on execute nodes on the cluster. The file is
+ * assumed to be available at the shared filesystem.
+ *
+ * @param path path of the file to be cached on the execute nodes
+ */
+ public void addPathToCache(String path) {
+ cacheSpec.add(path);
+ }
+
+ /**
+ * Attach a {@link HandleSpec} to a given {@link Handle}
+ * @param handle <code>Handle</code> to which the specification is to
+ * be attached.
+ * @param handleSpec <code>HandleSpec</code> for the given handle.
+ */
+ public void addHandleSpec(Handle handle, HandleSpec handleSpec) {
+ List<HandleSpec> handleSpecList = handleSpecs.get(handle);
+
+ if (handleSpecList == null) {
+ handleSpecList = new LinkedList<HandleSpec>();
+ handleSpecs.put(handle, handleSpecList);
+ }
+
+ handleSpecList.add(handleSpec);
+ }
+
+ /**
+ * Get specifications for the given <code>Handle</code>.
+ *
+ * @param handle <code>Handle</code> of the stream
+ * @return specification for the given <code>Handle</code>
+ */
+ public List<HandleSpec> getHandleSpecs(Handle handle) {
+ return handleSpecs.get(handle);
+ }
+
+ /**
+ * Should the stderr of the managed process be persisted?
+ *
+ * @return <code>true</code> if the stderr of the managed process should be
+ * persisted, <code>false</code> otherwise.
+ */
+ public boolean getPersistStderr() {
+ return persistStderr;
+ }
+
+ /**
+ * Specify if the stderr of the managed process should be persisted.
+ *
+ * @param persistStderr <code>true</code> if the stderr of the managed
+ * process should be persisted, else <code>false</code>
+ */
+ public void setPersistStderr(boolean persistStderr) {
+ this.persistStderr = persistStderr;
+ }
+
+ /**
+ * Get the directory where the log-files of the command are persisted.
+ *
+ * @return the directory where the log-files of the command are persisted
+ */
+ public String getLogDir() {
+ return logDir;
+ }
+
+ /**
+ * Set the directory where the log-files of the command are persisted.
+ *
+ * @param logDir the directory where the log-files of the command are persisted
+ */
+ public void setLogDir(String logDir) {
+ this.logDir = logDir;
+ if (this.logDir.startsWith("/")) {
+ this.logDir = this.logDir.substring(1);
+ }
+ setPersistStderr(true);
+ }
+
+ /**
+ * Get the maximum number of tasks whose stderr logs files are persisted.
+ *
+ * @return the maximum number of tasks whose stderr logs files are persisted
+ */
+ public int getLogFilesLimit() {
+ return logFilesLimit;
+ }
+
+ /**
+ * Set the maximum number of tasks whose stderr logs files are persisted.
+ * @param logFilesLimit the maximum number of tasks whose stderr logs files
+ * are persisted
+ */
+ public void setLogFilesLimit(int logFilesLimit) {
+ this.logFilesLimit = Math.min(MAX_TASKS, logFilesLimit);
+ }
+
+ /**
+ * Set whether files should be shipped or not.
+ *
+ * @param shipFiles <code>true</code> if files of this command should be
+ * shipped, <code>false</code> otherwise
+ */
+ public void setShipFiles(boolean shipFiles) {
+ this.shipFiles = shipFiles;
+ }
+
+ /**
+ * Get whether files for this command should be shipped or not.
+ *
+ * @return <code>true</code> if files of this command should be shipped,
+ * <code>false</code> otherwise
+ */
+ public boolean getShipFiles() {
+ return shipFiles;
+ }
+
+ public String toString() {
+ return executable;
+ }
+
+ /**
+ * Specification about the usage of the {@link Handle} to communicate
+ * with the external process.
+ *
+ * It specifies the stream-handle which can be one of <code>stdin</code>/
+ * <code>stdout</code>/<code>stderr</code> or a named file and also the
+ * serializer/deserializer specification to be used to read/write data
+ * to/from the stream.
+ */
+ public static class HandleSpec
+ implements Comparable<HandleSpec>, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ String name;
+ String spec;
+
+ /**
+ * Create a new {@link HandleSpec} with a given name using the default
+ * {@link PigStorage} serializer/deserializer.
+ *
+ * @param handleName name of the handle (one of <code>stdin</code>,
+ * <code>stdout</code> or a file-path)
+ */
+ public HandleSpec(String handleName) {
+ this(handleName, PigStorage.class.getName());
+ }
+
+ /**
+ * Create a new {@link HandleSpec} with a given name using the default
+ * {@link PigStorage} serializer/deserializer.
+ *
+ * @param handleName name of the handle (one of <code>stdin</code>,
+ * <code>stdout</code> or a file-path)
+ * @param spec serializer/deserializer spec
+ */
+ public HandleSpec(String handleName, String spec) {
+ this.name = handleName;
+ this.spec = spec;
+ }
+
+ public int compareTo(HandleSpec o) {
+ return this.name.compareTo(o.name);
+ }
+
+ public String toString() {
+ return name + " using " + spec;
+ }
+
+ /**
+ * Get the <b>name</b> of the <code>HandleSpec</code>.
+ *
+ * @return the <b>name</b> of the <code>HandleSpec</code> (one of
+ * <code>stdin</code>, <code>stdout</code> or a file-path)
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Set the <b>name</b> of the <code>HandleSpec</code>.
+ *
+ * @param name <b>name</b> of the <code>HandleSpec</code> (one of
+ * <code>stdin</code>, <code>stdout</code> or a file-path)
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Get the serializer/deserializer spec of the <code>HandleSpec</code>.
+ *
+ * @return the serializer/deserializer spec of the
+ * <code>HandleSpec</code>
+ */
+ public String getSpec() {
+ return spec;
+ }
+
+ /**
+ * Set the serializer/deserializer spec of the <code>HandleSpec</code>.
+ *
+ * @param spec the serializer/deserializer spec of the
+ * <code>HandleSpec</code>
+ */
+ public void setSpec(String spec) {
+ this.spec = spec;
+ }
+ }
+}
\ No newline at end of file