You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/04/01 23:22:42 UTC
svn commit: r643583 - in /incubator/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/
src/org/apache/pig/backend/hadoop/streaming/ src/org/apache/pig/bu...
Author: olga
Date: Tue Apr 1 14:22:38 2008
New Revision: 643583
URL: http://svn.apache.org/viewvc?rev=643583&view=rev
Log:
PIG-94: M3 of streaming
Added:
incubator/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/Optimizer.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
incubator/pig/trunk/test/org/apache/pig/test/TestBinaryStorage.java
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/src/org/apache/pig/PigServer.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java
incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Tue Apr 1 14:22:38 2008
@@ -195,3 +195,5 @@
PIG-122: Added build and src-gen to the list of ignore files in
the top level directory (joa23 via gates).
+
+ PIG-94: M3 code update for streaming
Modified: incubator/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/PigServer.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/PigServer.java Tue Apr 1 14:22:38 2008
@@ -51,6 +51,9 @@
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.optimizer.Optimizer;
+import org.apache.pig.impl.logicalLayer.optimizer.streaming.LoadOptimizer;
+import org.apache.pig.impl.logicalLayer.optimizer.streaming.StoreOptimizer;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.parser.QueryParser;
import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
@@ -275,7 +278,13 @@
// Check if we just processed a LOStore i.e. STORE
if (op instanceof LOStore) {
- runQuery(lp);
+ try {
+ optimizeAndRunQuery(lp);
+ }
+ catch (ExecException e) {
+ throw WrappedIOException.wrap("Unable to store alias " +
+ lp.getAlias(), e);
+ }
}
}
@@ -306,12 +315,10 @@
// execution.
LogicalPlan readFrom = (LogicalPlan) aliases.get(id);
-
+
+ // Run
try {
- ExecPhysicalPlan pp =
- pigContext.getExecutionEngine().compile(readFrom, null);
-
- ExecJob job = pigContext.getExecutionEngine().execute(pp);
+ ExecJob job = optimizeAndRunQuery(readFrom);
// invocation of "execute" is synchronous!
if (job.getStatus() == JOB_STATUS.COMPLETED) {
@@ -365,21 +372,36 @@
func,
pigContext);
- runQuery(storePlan);
- }
+ // Optimize
+ Optimizer optimizer = new LoadOptimizer();
+ optimizer.optimize(readFrom);
- private void runQuery(LogicalPlan storePlan) throws IOException {
- try {
- ExecPhysicalPlan pp =
- pigContext.getExecutionEngine().compile(storePlan, null);
- pigContext.getExecutionEngine().execute(pp);
+ try {
+ optimizeAndRunQuery(storePlan);
}
catch (ExecException e) {
throw WrappedIOException.wrap("Unable to store alias " +
- storePlan.getAlias(), e);
+ storePlan.getAlias(), e);
+
}
}
+
+ private ExecJob optimizeAndRunQuery(LogicalPlan root) throws ExecException {
+ // Optimize the LogicalPlan
+ Optimizer loadOptimizer = new LoadOptimizer();
+ loadOptimizer.optimize(root);
+
+ Optimizer storeOptimizer = new StoreOptimizer();
+ storeOptimizer.optimize(root);
+
+ // Execute
+ ExecPhysicalPlan pp =
+ pigContext.getExecutionEngine().compile(root, null);
+
+ return pigContext.getExecutionEngine().execute(pp);
+ }
+
/**
* Provide information on how a pig query will be executed. For now
* this information is very developer focussed, and probably not very
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java Tue Apr 1 14:22:38 2008
@@ -193,6 +193,8 @@
FileSpec fileSpec = new FileSpec(filename, loLoad.getInputFileSpec().getFuncSpec());
pom.addInputFile(fileSpec);
pom.mapParallelism = Math.max(pom.mapParallelism, lo.getRequestedParallelism());
+ pom.setProperty("pig.input.splitable",
+ Boolean.toString(loLoad.isSplitable()));
return pom.getOperatorKey();
}
else if (lo instanceof LOStore) {
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java Tue Apr 1 14:22:38 2008
@@ -259,6 +259,14 @@
properties.putAll(spec.getProperties());
}
+ public void setProperty(String key, String value) {
+ properties.setProperty(key, value);
+ }
+
+ public String getProperty(String key) {
+ return properties.getProperty(key);
+ }
+
public void visit(POVisitor v) {
v.visitMapreduce(this);
}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java Tue Apr 1 14:22:38 2008
@@ -49,6 +49,7 @@
public class PigInputFormat implements InputFormat<Text, Tuple>, JobConfigurable {
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ boolean isSplitable = job.getBoolean("pig.input.splitable", false);
ArrayList<FileSpec> inputs = (ArrayList<FileSpec>) ObjectSerializer.deserialize(job.get("pig.inputs"));
ArrayList<EvalSpec> mapFuncs = (ArrayList<EvalSpec>) ObjectSerializer.deserialize(job.get("pig.mapFuncs",""));
@@ -109,8 +110,9 @@
long size = fs.getFileStatus(fullPath).getLen();
long pos = 0;
String name = paths.get(j).getName();
- if (name.endsWith(".gz")) {
- // Anything that ends with a ".gz" we must process as a complete file
+ if (name.endsWith(".gz") || !isSplitable) {
+ // Anything that ends with a ".gz" or can't be split
+ // we must process as a complete file
splits.add(new PigSplit(pigContext, fs, fullPath, parser, groupFuncs==null ? null : groupFuncs.get(i), mapFuncs.get(i), i, 0, size));
} else {
while (pos < size) {
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java Tue Apr 1 14:22:38 2008
@@ -108,16 +108,23 @@
setupMapPipe(properties, reporter);
- // allocate key & value instances that are re-used for all entries
- WritableComparable key = input.createKey();
- Writable value = input.createValue();
- while (input.next(key, value)) {
- evalPipe.add((Tuple) value);
- }
- evalPipe.finishPipe(); // EOF marker
- evalPipe= null;
- if (pigWriter != null) {
- pigWriter.close(reporter);
+ try {
+ // allocate key & value instances that are re-used for all entries
+ WritableComparable key = input.createKey();
+ Writable value = input.createValue();
+ while (input.next(key, value)) {
+ evalPipe.add((Tuple) value);
+ }
+ } finally {
+ try {
+ evalPipe.finishPipe(); // EOF marker
+ evalPipe = null;
+ } finally {
+ // Close the writer
+ if (pigWriter != null) {
+ pigWriter.close(reporter);
+ }
+ }
}
}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Tue Apr 1 14:22:38 2008
@@ -111,35 +111,37 @@
}
public void close() throws IOException, ExecException {
- super.close();
-
- // Footer for stderr file of the task
- writeDebugFooter();
-
- // Copy the secondary outputs of the task to HDFS
- Path scriptOutputDir = new Path(this.scriptOutputDir);
- FileSystem fs = scriptOutputDir.getFileSystem(job);
- List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
- if (outputSpecs != null) {
- for (int i=1; i < outputSpecs.size(); ++i) {
- String fileName = outputSpecs.get(i).getName();
- try {
- fs.copyFromLocalFile(false, true, new Path(fileName),
- new Path(scriptOutputDir,
- taskId+"-"+fileName)
- );
- } catch (IOException ioe) {
- System.err.println("Failed to save secondary output '" +
- fileName + "' of task: " + taskId +
- " with " + ioe);
- throw new ExecException(ioe);
+ try {
+ super.close();
+
+ // Copy the secondary outputs of the task to HDFS
+ Path scriptOutputDir = new Path(this.scriptOutputDir);
+ FileSystem fs = scriptOutputDir.getFileSystem(job);
+ List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
+ if (outputSpecs != null) {
+ for (int i=1; i < outputSpecs.size(); ++i) {
+ String fileName = outputSpecs.get(i).getName();
+ try {
+ fs.copyFromLocalFile(false, true, new Path(fileName),
+ new Path(scriptOutputDir,
+ taskId+"-"+fileName)
+ );
+ } catch (IOException ioe) {
+ System.err.println("Failed to save secondary output '" +
+ fileName + "' of task: " + taskId +
+ " with " + ioe);
+ throw new ExecException(ioe);
+ }
}
- }
}
-
- // Close the stderr file on HDFS
- if (errorStream != null) {
- errorStream.close();
+ } finally {
+ // Footer for stderr file of the task
+ writeDebugFooter();
+
+ // Close the stderr file on HDFS
+ if (errorStream != null) {
+ errorStream.close();
+ }
}
}
@@ -152,11 +154,14 @@
* HDFS, <code>false</code> otherwise
*/
private boolean writeErrorToHDFS(int limit, String taskId) {
- // These are hard-coded begin/end offsets a Hadoop *taskid*
- int beginIndex = 25, endIndex = 31;
-
- int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
- return command.getPersistStderr() && tipId < command.getLogFilesLimit();
+ if (command.getPersistStderr()) {
+ // These are hard-coded begin/end offsets a Hadoop *taskid*
+ int beginIndex = 25, endIndex = 31;
+
+ int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
+ return tipId < command.getLogFilesLimit();
+ }
+ return false;
}
protected void processError(String error) {
@@ -175,7 +180,12 @@
private void writeDebugHeader() {
processError("===== Task Information Header =====" );
- processError("\nCommand: " + command.getExecutable());
+ StringBuffer sb = new StringBuffer();
+ for (String arg : command.getCommandArgs()) {
+ sb.append(arg);
+ sb.append(" ");
+ }
+ processError("\nCommand: " + sb.toString());
processError("\nStart time: " + new Date(System.currentTimeMillis()));
processError("\nInput-split file: " + job.get("map.input.file"));
processError("\nInput-split start-offset: " +
@@ -195,6 +205,7 @@
List<HandleSpec> inputSpecs = command.getHandleSpecs(Handle.INPUT);
HandleSpec inputSpec =
(inputSpecs != null) ? inputSpecs.get(0) : null;
+ processError("\nInput records: " + inputRecords);
processError("\nInput bytes: " + inputBytes + " bytes " +
((inputSpec != null) ?
"(" + inputSpec.getName() + " using " +
@@ -204,6 +215,7 @@
List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
HandleSpec outputSpec =
(outputSpecs != null) ? outputSpecs.get(0) : null;
+ processError("\nOutput records: " + outputRecords);
processError("\nOutput bytes: " + outputBytes + " bytes " +
((outputSpec != null) ?
"(" + outputSpec.getName() + " using " +
Modified: incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Tue Apr 1 14:22:38 2008
@@ -95,4 +95,8 @@
out.write(Tuple.RECORD_3);
t.write(out);
}
+
+ public boolean equals(Object obj) {
+ return true;
+ }
}
Added: incubator/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java?rev=643583&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java Tue Apr 1 14:22:38 2008
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+
+/**
+ * {@link BinaryStorage} is a simple, as-is, serializer/deserializer pair.
+ *
+ * It is {@link LoadFunc} which loads all the given data from the given
+ * {@link InputStream} into a single {@link Tuple} and a {@link StoreFunc}
+ * which writes out all input data as a single <code>Tuple</code>.
+ *
+ * <code>BinaryStorage</code> is intended to work in cases where input files
+ * are to be sent in-whole for processing without any splitting and
+ * interpretation of their data.
+ */
+public class BinaryStorage implements LoadFunc, StoreFunc {
+ // LoadFunc
+ private static final int DEFAULT_BUFFER_SIZE = 64*1024;
+ protected int bufferSize = DEFAULT_BUFFER_SIZE;
+
+ protected BufferedPositionedInputStream in = null;
+ protected long offset = 0;
+ protected long end = Long.MAX_VALUE;
+
+ // StoreFunc
+ OutputStream out;
+
+ /**
+ * Create a <code>BinaryStorage</code> with default buffer size for reading
+ * inputs.
+ */
+ public BinaryStorage() {}
+
+ /**
+ * Create a <code>BinaryStorage</code> with the given buffer-size for
+ * reading inputs.
+ *
+ * @param bufferSize buffer size to be used
+ */
+ public BinaryStorage(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ public void bindTo(String fileName, BufferedPositionedInputStream in,
+ long offset, long end) throws IOException {
+ this.in = in;
+ this.offset = offset;
+ this.end = end;
+ }
+
+ public Tuple getNext() throws IOException {
+ // Sanity check
+ if (in == null || in.getPosition() > end) {
+ return null;
+ }
+
+ // Copy all data into the buffer
+ byte[] buffer = new byte[bufferSize];
+ int off = 0;
+ int len = bufferSize;
+ int n = 0;
+ while (len > 0 && (n = in.read(buffer, off, len)) != -1) {
+ off += n;
+ len -= n;
+ }
+
+ if (n == -1) {
+ // Copy out the part-buffer and send it
+ byte[] copy = new byte[off];
+ System.arraycopy(buffer, 0, copy, 0, copy.length);
+ buffer = copy;
+ }
+
+ // Create a new Tuple with one DataAtom field and return it,
+ // ensure that we return 'null' if we didn't get any data
+ if (off > 0) {
+ return new Tuple(new DataAtom(buffer));
+ }
+
+ return null;
+ }
+
+ public void bindTo(OutputStream out) throws IOException {
+ this.out = out;
+ }
+
+ public void finish() throws IOException {}
+
+ public void putNext(Tuple f) throws IOException {
+ // Pick up the first field of the Tuple, then it's
+ // raw-bytes and send it out
+ DataAtom dAtom = (DataAtom)(f.getAtomField(0));
+ byte[] data = dAtom.getValueBytes();
+ if (data.length > 0) {
+ out.write(dAtom.getValueBytes());
+ out.flush();
+ }
+ }
+
+ public String toString() {
+ return "BinaryStorage(" + bufferSize + ")";
+ }
+
+ public boolean equals(Object obj) {
+ return true;
+ }
+}
Modified: incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Tue Apr 1 14:22:38 2008
@@ -89,4 +89,12 @@
public void finish() throws IOException {
}
+ public boolean equals(Object obj) {
+ return equals((PigStorage)obj);
+ }
+
+ public boolean equals(PigStorage other) {
+ return this.fieldDel.equals(other.fieldDel);
+ }
+
}
Modified: incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java Tue Apr 1 14:22:38 2008
@@ -20,11 +20,19 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.io.WritableComparator;
/**
- * The basic data unit. For now, we represent all atomic data objects as strings
+ * The basic data unit.
+ *
+ * We represent all atomic data objects as strings or raw-bytes.
*/
final public class DataAtom extends Datum {
+ private enum Type {BINARY, STRING};
+
+ Type type = Type.STRING;
String stringVal = null;
Double doubleVal = null;
public static String EMPTY = "";
@@ -62,6 +70,8 @@
public void setValue(byte[] valIn) {
binaryVal = valIn;
+ type = Type.BINARY;
+
stringVal = null;
doubleVal = Double.POSITIVE_INFINITY;
}
@@ -85,6 +95,22 @@
stringVal = Double.toString(valIn);
}
+ public byte[] getValueBytes() {
+ byte[] data = null;
+
+ if (type == Type.STRING) {
+ try {
+ data = stringVal.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException uee) {
+ data = null;
+ }
+ } else {
+ data = binaryVal;
+ }
+
+ return data;
+ }
+
public String strval() {
return stringVal;
}
@@ -120,37 +146,52 @@
return -1;
DataAtom dOther = (DataAtom) other;
- return stringVal.compareTo(dOther.stringVal);
+ return (type == Type.STRING ) ? stringVal.compareTo(dOther.stringVal) :
+ WritableComparator.compareBytes(binaryVal, 0, binaryVal.length,
+ dOther.binaryVal, 0,
+ dOther.binaryVal.length);
}
@Override
public void write(DataOutput out) throws IOException {
out.write(ATOM);
+ out.writeUTF(type.toString());
byte[] data;
- try {
- data = strval().getBytes("UTF-8");
- } catch (Exception e) {
- long size = strval().length();
- throw new RuntimeException("Error dealing with DataAtom of size " + size, e);
+ if (type == Type.BINARY) {
+ data = binaryVal;
+ } else {
+ try {
+ data = strval().getBytes("UTF-8");
+ } catch (Exception e) {
+ long size = strval().length();
+ throw new RuntimeException("Error dealing with DataAtom of size " + size, e);
+ }
}
Tuple.encodeInt(out, data.length);
out.write(data);
}
static DataAtom read(DataInput in) throws IOException {
+ Type type = Type.valueOf(in.readUTF());
int len = Tuple.decodeInt(in);
DataAtom ret = new DataAtom();
byte[] data = new byte[len];
in.readFully(data);
- ret.setValue(new String(data, "UTF-8"));
+ if (type == Type.STRING) {
+ ret.setValue(new String(data, "UTF-8"));
+ }
+ else {
+ ret.setValue(data);
+ }
return ret;
}
@Override
public int hashCode() {
- return stringVal.hashCode();
+ return (type == Type.STRING) ? stringVal.hashCode() :
+ WritableComparator.hashBytes(binaryVal, binaryVal.length);
}
@Override
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java Tue Apr 1 14:22:38 2008
@@ -68,16 +68,22 @@
}
properties.setProperty(property, sb.toString());
}
+
+ /**
+ * Get the {@link StreamingCommand} for this <code>StreamSpec</code>.
+ * @return
+ */
+ public StreamingCommand getCommand() {
+ return command;
+ }
- @Override
public List<String> getFuncs() {
// No user-defined functions here
return new ArrayList<String>();
}
protected Schema mapInputSchema(Schema schema) {
- // EvalSpec _has_ to have the schema if there is one...
- return null;
+ return schema;
}
protected DataCollector setupDefaultPipe(Properties properties,
@@ -120,7 +126,7 @@
// Start the executable
this.executableManager.run();
} catch (Exception e) {
- LOG.fatal("Failed to create/start PigExecutableManager with: " +
+ LOG.fatal("Failed to create/start ExecutableManager with: " +
e);
e.printStackTrace();
throw new RuntimeException(e);
@@ -142,7 +148,7 @@
executableManager.close();
}
catch (Exception e) {
- LOG.fatal("Failed to close PigExecutableManager with: " + e);
+ LOG.fatal("Failed to close ExecutableManager with: " + e);
throw new RuntimeException(e);
}
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java Tue Apr 1 14:22:38 2008
@@ -19,6 +19,8 @@
import java.util.Iterator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Datum;
import org.apache.pig.data.Tuple;
@@ -31,6 +33,9 @@
* or a file.
*/
public abstract class DataCollector {
+ private static final Log LOG =
+ LogFactory.getLog(DataCollector.class.getName());
+
Integer staleCount = 0;
protected boolean inTheMiddleOfBag = false;
@@ -121,10 +126,21 @@
}
}
- public void finishPipe(){
- finish();
- if (successor!=null)
- successor.finishPipe();
+ public final void finishPipe() {
+ try {
+ finish();
+ } finally {
+ try {
+ if (successor != null) {
+ successor.finishPipe();
+ }
+ } catch (Exception e) {
+ // Ignore this exception since the original is more relevant
+ LOG.debug(e);
+ } finally {
+ successor = null;
+ }
+ }
}
protected void finish(){}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java Tue Apr 1 14:22:38 2008
@@ -39,14 +39,17 @@
protected FileSpec inputFileSpec;
protected int outputType = FIXED;
-
+
+ protected boolean splitable = true;
public LOLoad(Map<OperatorKey, LogicalOperator> opTable,
String scope,
long id,
- FileSpec inputFileSpec) throws IOException, ParseException {
+ FileSpec inputFileSpec, boolean splitable)
+ throws IOException, ParseException {
super(opTable, scope, id);
this.inputFileSpec = inputFileSpec;
+ this.splitable = splitable;
// check if we can instantiate load func
LoadFunc storageFunc = (LoadFunc) PigContext
@@ -113,6 +116,10 @@
return funcs;
}
+ public boolean isSplitable() {
+ return splitable;
+ }
+
public void visit(LOVisitor v) {
v.visitLoad(this);
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java Tue Apr 1 14:22:38 2008
@@ -62,6 +62,9 @@
return outputFileSpec;
}
+ public void setOutputFileSpec(FileSpec spec) {
+ outputFileSpec = spec;
+ }
@Override
public String toString() {
Added: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/Optimizer.java?rev=643583&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/Optimizer.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/Optimizer.java Tue Apr 1 14:22:38 2008
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.optimizer;
+
+import org.apache.pig.impl.logicalLayer.LOVisitor;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+
+/**
+ * {@link Optimizer} is a simple {@link LogicalPlan} optimizer.
+ *
+ * It <em>visits</em> every node in the <code>LogicalPlan</code> and then
+ * optimizes the <code>LogicalPlan</code>.
+ */
+public abstract class Optimizer extends LOVisitor {
+ /**
+ * Optimize the given {@link LogicalPlan} if feasible and return status.
+ *
+ * @param root root of the {@link LogicalPlan} to optimize
+ * @return <code>true</code> if optimization was feasible and was effected,
+ * <code>false</code> otherwise.
+ */
+ abstract public boolean optimize(LogicalPlan root);
+}
Added: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java?rev=643583&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java Tue Apr 1 14:22:38 2008
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.optimizer.streaming;
+
+import java.util.List;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.builtin.BinaryStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.StreamSpec;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOEval;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.impl.logicalLayer.LOUnion;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.optimizer.Optimizer;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link LoadOptimizer} tries to optimize away the deserialization done by Pig
+ * for the simple case of a LOAD followed by a STREAM operator; both with the
+ * equivalent {@link LoadFunc} specifications.
+ *
+ * In such cases it is safe to replace the equivalent <code>LoadFunc</code>
+ * specifications with a {@link BinaryStorage} which doesn't interpret the
+ * input bytes at all.
+ */
+public class LoadOptimizer extends Optimizer {
+ boolean optimize = false;
+ boolean parentLoad = false;
+ LOLoad load = null;
+
+ public void visitCogroup(LOCogroup g) {
+ super.visitCogroup(g);
+ parentLoad = false;
+ }
+
+ public void visitEval(LOEval e) {
+ super.visitEval(e);
+
+ if (parentLoad) {
+ EvalSpec spec = e.getSpec();
+ if (spec instanceof StreamSpec && !load.isSplitable()) {
+ // Try and optimize if the load and stream input specs match
+ // and input files are to be processed as-is
+ StreamSpec streamSpec = (StreamSpec)spec;
+ StreamingCommand command = streamSpec.getCommand();
+ List<HandleSpec> inputSpecs =
+ command.getHandleSpecs(Handle.INPUT);
+ HandleSpec streamInputSpec =
+ (inputSpecs == null) ?
+ new HandleSpec("stdin" , "PigStorage()") :
+ inputSpecs.get(0);
+
+ FileSpec loadFileSpec = load.getInputFileSpec();
+
+ // Instantiate both LoadFunc objects to compare them for
+ // equality
+ LoadFunc streamLoader =
+ (LoadFunc)PigContext.instantiateFuncFromSpec(
+ streamInputSpec.getSpec());
+
+ LoadFunc inputLoader = (LoadFunc)PigContext.instantiateFuncFromSpec(
+ loadFileSpec.getFuncSpec());
+
+ if (streamLoader.equals(inputLoader)) {
+ // Since they both are the same, we can flip them
+ // for BinaryStorage
+ load.setInputFileSpec(new FileSpec(loadFileSpec.getFileName(), BinaryStorage.class.getName()));
+ streamInputSpec.setSpec(BinaryStorage.class.getName());
+
+ optimize = true;
+ }
+ }
+ }
+
+ parentLoad = false;
+ }
+
+ public void visitLoad(LOLoad load) {
+ super.visitLoad(load);
+ parentLoad = true;
+ this.load = load;
+ }
+
+ public void visitSort(LOSort s) {
+ super.visitSort(s);
+ parentLoad = false;
+ }
+
+ public void visitSplit(LOSplit s) {
+ super.visitSplit(s);
+ parentLoad = false;
+ }
+
+ public void visitSplitOutput(LOSplitOutput s) {
+ super.visitSplitOutput(s);
+ parentLoad = false;
+ }
+
+ public void visitStore(LOStore s) {
+ super.visitStore(s);
+ parentLoad = false;
+ }
+
+ public void visitUnion(LOUnion u) {
+ super.visitUnion(u);
+ parentLoad = false;
+ }
+
+ public boolean optimize(LogicalPlan root) {
+ LogicalOperator r = root.getOpTable().get(root.getRoot());
+ r.visit(this);
+ return optimize;
+ }
+}
Added: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java?rev=643583&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java Tue Apr 1 14:22:38 2008
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.optimizer.streaming;
+
+import java.util.List;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.builtin.BinaryStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.StreamSpec;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOEval;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.impl.logicalLayer.LOUnion;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.optimizer.Optimizer;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link StoreOptimizer} tries to optimize away the deserialization done by Pig
+ * for the simple case of a STREAM followed by a STORE operator; both with
+ * equivalent {@link StoreFunc} specifications.
+ *
+ * In such cases it is safe to replace the <code>StoreFunc</code>
+ * specifications with a {@link BinaryStorage} which doesn't interpret the
+ * output bytes at all.
+ */
+public class StoreOptimizer extends Optimizer {
+ boolean optimize = false;
+ boolean parentEval = false;
+ LOEval eval = null;
+
+ public void visitCogroup(LOCogroup g) {
+ super.visitCogroup(g);
+ parentEval = false;
+ }
+
+ public void visitEval(LOEval e) {
+ super.visitEval(e);
+ eval = e;
+ parentEval = true;
+ }
+
+ public void visitLoad(LOLoad load) {
+ super.visitLoad(load);
+ parentEval = false;
+ }
+
+ public void visitSort(LOSort s) {
+ super.visitSort(s);
+ parentEval = false;
+ }
+
+ public void visitSplit(LOSplit s) {
+ super.visitSplit(s);
+ parentEval = false;
+ }
+
+ public void visitSplitOutput(LOSplitOutput s) {
+ super.visitSplitOutput(s);
+ parentEval = false;
+ }
+
+ public void visitStore(LOStore s) {
+ super.visitStore(s);
+
+ if (parentEval) {
+ EvalSpec spec = eval.getSpec();
+ if (spec instanceof StreamSpec) {
+ // Try and optimize if the store and stream output specs match
+ StreamSpec streamSpec = (StreamSpec)spec;
+ StreamingCommand command = streamSpec.getCommand();
+ List<HandleSpec> outputSpecs =
+ command.getHandleSpecs(Handle.OUTPUT);
+ HandleSpec streamOutputSpec =
+ (outputSpecs == null) ?
+ new HandleSpec("stdout" , "PigStorage()") :
+ outputSpecs.get(0);
+
+ FileSpec storeFileSpec = s.getOutputFileSpec();
+
+ // Instantiate both StoreFunc objects to compare them for
+ // equality
+ StoreFunc streamStorer =
+ (StoreFunc)PigContext.instantiateFuncFromSpec(
+ streamOutputSpec.getSpec());
+
+ StoreFunc outputStorer = (StoreFunc)PigContext.instantiateFuncFromSpec(
+ storeFileSpec.getFuncSpec());
+
+ if (streamStorer.equals(outputStorer)) {
+ // Since they both are the same, we can flip them
+ // for BinaryStorage
+ s.setOutputFileSpec(new FileSpec(storeFileSpec.getFileName(), BinaryStorage.class.getName()));
+ streamOutputSpec.setSpec(BinaryStorage.class.getName());
+
+ optimize = true;
+ }
+ }
+ }
+
+ parentEval = false;
+ }
+
+ public void visitUnion(LOUnion u) {
+ super.visitUnion(u);
+ parentEval = false;
+ }
+
+ public boolean optimize(LogicalPlan root) {
+ LogicalOperator r = root.getOpTable().get(root.getRoot());
+ r.visit(this);
+ return optimize;
+ }
+}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Apr 1 14:22:38 2008
@@ -662,8 +662,8 @@
{return op;}
}
-LogicalOperator LoadClause() : {Token t1, t2; String filename; String funcName,funcArgs, funcSpec=null;
- LOLoad lo=null; boolean continuous=false;}
+LogicalOperator LoadClause() : {Token t1, t2, t3; String filename; String funcName,funcArgs, funcSpec=null;
+ LOLoad lo=null; boolean continuous=false; String splitBy; boolean splitable = true;}
{
( filename = FileName()
(
@@ -672,6 +672,15 @@
funcSpec = funcName + "(" + funcArgs + ")";
}
)?
+ (
+ <SPLIT> <BY> t3 = <QUOTEDSTRING>
+ {
+ splitBy = unquote(t3.image);
+ if (splitBy.equalsIgnoreCase("file")) {
+ splitable = false;
+ }
+ }
+ )?
)
[ <CONTINUOUSLY> {continuous=true;} ]
{
@@ -680,22 +689,33 @@
funcSpec += continuous ? "('\t','\n','0')" : "()";
}
- lo = new LOLoad(opTable, scope, getNextId(), new FileSpec(massageFilename(filename, pigContext, true), funcSpec));
+ lo = new LOLoad(opTable, scope, getNextId(), new FileSpec(massageFilename(filename, pigContext, true), funcSpec), splitable);
if (continuous)
lo.setOutputType(LogicalOperator.MONOTONE);
return lo;
}
}
-String StringList() : {StringBuilder sb = new StringBuilder(); Token t;}
+String StringList() : {StringBuilder sb = new StringBuilder(); Token t; String arg;}
{
(
+ LOOKAHEAD(2)
(
t = <QUOTEDSTRING> {sb.append(t.image);}
- ( "," t = <QUOTEDSTRING> {sb.append(",");sb.append(t.image);} )*
- )
- | {}
- )
+ |
+ t = <NUMBER> {sb.append(t.image);}
+ )
+ (
+ ","
+ (
+ t = <QUOTEDSTRING> {sb.append(t.image);}
+ |
+ t = <NUMBER> {sb.append(t.image);}
+ )
+ )*
+ |
+ {}
+ )
{return sb.toString();}
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Tue Apr 1 14:22:38 2008
@@ -33,6 +33,7 @@
import org.apache.pig.data.Datum;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.streaming.InputHandler.InputType;
import org.apache.pig.impl.streaming.OutputHandler.OutputType;
@@ -74,11 +75,29 @@
Properties properties;
+ // Statistics
+ protected long inputRecords = 0;
protected long inputBytes = 0;
+ protected long outputRecords = 0;
protected long outputBytes = 0;
+ /**
+ * Create a new {@link ExecutableManager}.
+ */
public ExecutableManager() {}
+ /**
+ * Configure and initialize the {@link ExecutableManager}.
+ *
+ * @param properties {@link Properties} for the
+ * <code>ExecutableManager</code>
+ * @param command {@link StreamingCommand} to be run by the
+ * <code>ExecutableManager</code>
+ * @param endOfPipe {@link DataCollector} to be used to push results of the
+ * <code>StreamingCommand</code> down
+ * @throws IOException
+ * @throws ExecException
+ */
public void configure(Properties properties, StreamingCommand command,
DataCollector endOfPipe)
throws IOException, ExecException {
@@ -96,51 +115,82 @@
this.endOfPipe = endOfPipe;
}
+ /**
+ * Close and cleanup the {@link ExecutableManager}.
+ *
+ * @throws IOException
+ * @throws ExecException
+ */
public void close() throws IOException, ExecException {
- // Close the InputHandler, which in some cases lets the process
- // terminate
- inputHandler.close();
-
- // Check if we need to start the process now ...
- if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
- exec();
- }
-
- // Wait for the process to exit and the stdout/stderr threads to complete
- try {
- exitCode = process.waitFor();
-
- if (stdoutThread != null) {
- stdoutThread.join(0);
- }
- if (stderrThread != null) {
- stderrThread.join(0);
- }
+ try {
+ // Close the InputHandler, which in some cases lets the process
+ // terminate
+ inputHandler.close();
+
+ // Check if we need to start the process now ...
+ if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
+ exec();
+ }
- } catch (InterruptedException ie) {}
+ // Wait for the process to exit
+ try {
+ exitCode = process.waitFor();
+ } catch (InterruptedException ie) {}
+
+ // Wait for stdout thread to complete
+ try {
+ if (stdoutThread != null) {
+ stdoutThread.join(0);
+ }
+ stdoutThread = null;
+ } catch (InterruptedException ie) {}
+
+ // Wait for stderr thread to complete
+ try {
+ if (stderrThread != null) {
+ stderrThread.join(0);
+ }
+ stderrThread = null;
+ } catch (InterruptedException ie) {}
+
+ // Clean up the process
+ process.destroy();
+ process = null;
+
+ LOG.debug("Process exited with: " + exitCode);
+ if (exitCode != SUCCESS) {
+ throw new ExecException(command + " failed with exit status: " +
+ exitCode);
+ }
- // Clean up the process
- process.destroy();
-
- LOG.debug("Process exited with: " + exitCode);
- if (exitCode != SUCCESS) {
- throw new ExecException(command + " failed with exit status: " +
- exitCode);
- }
-
- if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
- // Trigger the outputHandler
- outputHandler.bindTo(null);
+ if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
+ // Trigger the outputHandler
+ outputHandler.bindTo("", null, 0, -1);
+
+ // Start the thread to process the output and wait for
+ // it to terminate
+ stdoutThread = new ProcessOutputThread(outputHandler);
+ stdoutThread.start();
+
+ try {
+ stdoutThread.join(0);
+ } catch (InterruptedException ie) {}
+ }
+ } finally {
+ // Cleanup, release resources ...
+ if (process != null) {
+ process.destroy();
+ }
- // Start the thread to process the output and wait for
- // it to terminate
- stdoutThread = new ProcessOutputThread(outputHandler);
- stdoutThread.start();
-
- try {
- stdoutThread.join(0);
- } catch (InterruptedException ie) {}
- }
+ if (stdoutThread != null) {
+ stdoutThread.interrupt();
+ }
+
+ if (stderrThread != null) {
+ stderrThread.interrupt();
+ }
+ }
+
}
@@ -172,7 +222,8 @@
new DataInputStream(new BufferedInputStream(process.getInputStream()));
// Bind the stdout to the OutputHandler
- outputHandler.bindTo(stdout);
+ outputHandler.bindTo("", new BufferedPositionedInputStream(stdout),
+ 0, Long.MAX_VALUE);
// Start the thread to process the executable's stdout
stdoutThread = new ProcessOutputThread(outputHandler);
@@ -180,6 +231,11 @@
}
}
+ /**
+ * Start execution of the {@link ExecutableManager}.
+ *
+ * @throws IOException
+ */
public void run() throws IOException {
// Check if we need to exec the process NOW ...
if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
@@ -193,11 +249,19 @@
inputHandler.bindTo(stdin);
}
+ /**
+ * Send the given {@link Datum} to the external command managed by the
+ * {@link ExecutableManager}.
+ *
+ * @param d <code>Datum</code> to be sent to the external command
+ * @throws IOException
+ */
public void add(Datum d) throws IOException {
// Pass the serialized tuple to the executable via the InputHandler
Tuple t = (Tuple)d;
inputHandler.putNext(t);
inputBytes += t.getMemorySize();
+ inputRecords++;
}
/**
@@ -210,6 +274,7 @@
*/
protected void processOutput(Datum d) {
endOfPipe.add(d);
+ outputRecords++;
}
class ProcessOutputThread extends Thread {
Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java Tue Apr 1 14:22:38 2008
@@ -20,11 +20,11 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
/**
@@ -35,7 +35,7 @@
public class FileOutputHandler extends OutputHandler {
String fileName;
- InputStream fileInStream;
+ BufferedPositionedInputStream fileInStream;
public FileOutputHandler(HandleSpec handleSpec) throws ExecException {
fileName = handleSpec.name;
@@ -47,10 +47,15 @@
return OutputType.ASYNCHRONOUS;
}
- public void bindTo(InputStream is) throws IOException {
+ public void bindTo(String fileName, BufferedPositionedInputStream is,
+ long offset, long end) throws IOException {
// This is a trigger to start processing the output from the file ...
- fileInStream = new FileInputStream(new File(fileName));
- super.bindTo(fileInStream);
+ // ... however, we must ignore the input parameters and use ones
+ // provided during initialization
+ File file = new File(this.fileName);
+ this.fileInStream =
+ new BufferedPositionedInputStream(new FileInputStream(file));
+ super.bindTo(this.fileName, this.fileInStream, 0, file.length());
}
public void close() throws IOException {
Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java Tue Apr 1 14:22:38 2008
@@ -18,7 +18,6 @@
package org.apache.pig.impl.streaming;
import java.io.IOException;
-import java.io.InputStream;
import org.apache.pig.LoadFunc;
import org.apache.pig.data.Tuple;
@@ -58,9 +57,10 @@
* of the managed process
* @throws IOException
*/
- public void bindTo(InputStream is) throws IOException {
- deserializer.bindTo("", new BufferedPositionedInputStream(is), 0,
- Long.MAX_VALUE);
+ public void bindTo(String fileName, BufferedPositionedInputStream is,
+ long offset, long end) throws IOException {
+ deserializer.bindTo(fileName, new BufferedPositionedInputStream(is),
+ offset, end);
}
/**
Added: incubator/pig/trunk/test/org/apache/pig/test/TestBinaryStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestBinaryStorage.java?rev=643583&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestBinaryStorage.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestBinaryStorage.java Tue Apr 1 14:22:38 2008
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.pig.PigServer;
+import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestBinaryStorage extends TestCase {
+ private static final String simpleEchoStreamingCommand =
+ "perl -ne 'print \"$_\"'";
+
+ MiniCluster cluster = MiniCluster.buildCluster();
+
+ private static final int MAX_DATA_SIZE = 1024;
+
+ @Test
+ public void testBinaryStorageWithAsciiData() throws Exception {
+ // Create input file with ascii data
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3", "D,2",
+ "A,5", "B,5", "C,8", "A,8",
+ "D,8", "A,9"});
+
+ // Test if data is handled correctly by BinaryStorage
+ testBinaryStorage(input);
+ }
+
+ @Test
+ public void testBinaryStorageWithBinaryData() throws Exception {
+ // Create input file and fill it with random binary data
+ File input = File.createTempFile("tmp", "dat");
+ byte[] data = new byte[MAX_DATA_SIZE];
+ randomizeBytes(data, 0, data.length);
+
+ FileOutputStream os = new FileOutputStream(input);
+ os.write(data, 0, data.length);
+ os.close();
+
+ // Test if data is handled correctly by BinaryStorage
+ testBinaryStorage(input);
+ }
+
+ private void testBinaryStorage(File input)
+ throws Exception {
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE);
+
+ // Get input data
+ byte[] inputData = new byte[MAX_DATA_SIZE];
+ int inputLen =
+ readFileDataIntoBuffer(new FileInputStream(input), inputData);
+
+ // Pig query to run
+ pigServer.registerQuery("DEFINE CMD `" +
+ simpleEchoStreamingCommand + "` " +
+ "input(stdin using BinaryStorage()) " +
+ "output(stdout using BinaryStorage());");
+ pigServer.registerQuery("IP = load 'file:" + input + "' using " +
+ "BinaryStorage() split by 'file';");
+ pigServer.registerQuery("OP = stream IP through CMD;");
+
+ // Save the output using BinaryStorage
+ String output = "/pig/out";
+ pigServer.store("OP", output, "BinaryStorage()");
+
+ // Get output data
+ InputStream out = FileLocalizer.open(output, pigServer.getPigContext());
+ byte[] outputData = new byte[MAX_DATA_SIZE];
+ int outputLen = readFileDataIntoBuffer(out, outputData);
+
+ // Check if the data is the same ...
+ assertEquals(true,
+ WritableComparator.compareBytes(inputData, 0, inputLen,
+ outputData, 0, outputLen) == 0);
+
+ // Cleanup
+ pigServer.deleteFile(output);
+ }
+
+ private static void randomizeBytes(byte[] data, int offset, int length) {
+ Random random = new Random();
+ for(int i=offset + length - 1; i >= offset; --i) {
+ data[i] = (byte) random.nextInt(256);
+ }
+ }
+
+ private static int readFileDataIntoBuffer(InputStream is, byte[] buffer)
+ throws IOException {
+ int n = 0;
+ int off = 0, len = buffer.length;
+ while (len > 0 && (n = is.read(buffer, off, len)) != -1) {
+ off += n;
+ len -= n;
+ }
+ return off;
+ }
+}
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Tue Apr 1 14:22:38 2008
@@ -26,6 +26,7 @@
import org.junit.Test;
import org.apache.pig.PigServer;
+import org.apache.pig.PigServer.ExecType;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.*;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
@@ -56,8 +57,18 @@
}
@Test
- public void testSimpleMapSideStreaming() throws Exception {
- PigServer pigServer = new PigServer(MAPREDUCE);
+ public void testLocalSimpleMapSideStreaming() throws Exception {
+ testSimpleMapSideStreaming(ExecType.LOCAL);
+ }
+
+ @Test
+ public void testMRSimpleMapSideStreaming() throws Exception {
+ testSimpleMapSideStreaming(ExecType.MAPREDUCE);
+ }
+
+ private void testSimpleMapSideStreaming(ExecType execType)
+ throws Exception {
+ PigServer pigServer = new PigServer(execType);
File input = Util.createInputFile("tmp", "",
new String[] {"A,1", "B,2", "C,3", "D,2",
@@ -74,7 +85,9 @@
pigServer.registerQuery("IP = load 'file:" + input + "' using " +
PigStorage.class.getName() + "(',');");
pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
- pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
+ pigServer.registerQuery("S1 = stream FILTERED_DATA through `" +
+ simpleEchoStreamingCommand + "`;");
+ pigServer.registerQuery("OP = stream S1 through `" +
simpleEchoStreamingCommand + "`;");
// Run the query and check the results
@@ -82,8 +95,20 @@
}
@Test
- public void testSimpleMapSideStreamingWithOutputSchema() throws Exception {
- PigServer pigServer = new PigServer(MAPREDUCE);
+ public void testLocalSimpleMapSideStreamingWithOutputSchema()
+ throws Exception {
+ testSimpleMapSideStreamingWithOutputSchema(ExecType.LOCAL);
+ }
+
+ @Test
+ public void testMRSimpleMapSideStreamingWithOutputSchema()
+ throws Exception {
+ testSimpleMapSideStreamingWithOutputSchema(ExecType.MAPREDUCE);
+ }
+
+ private void testSimpleMapSideStreamingWithOutputSchema(ExecType execType)
+ throws Exception {
+ PigServer pigServer = new PigServer(execType);
File input = Util.createInputFile("tmp", "",
new String[] {"A,1", "B,2", "C,3", "D,2",
@@ -109,8 +134,20 @@
}
@Test
- public void testSimpleReduceSideStreamingAfterFlatten() throws Exception {
- PigServer pigServer = new PigServer(MAPREDUCE);
+ public void testLocalSimpleReduceSideStreamingAfterFlatten()
+ throws Exception {
+ testSimpleReduceSideStreamingAfterFlatten(ExecType.LOCAL);
+ }
+
+ @Test
+ public void testMRSimpleReduceSideStreamingAfterFlatten()
+ throws Exception {
+ testSimpleReduceSideStreamingAfterFlatten(ExecType.MAPREDUCE);
+ }
+
+ private void testSimpleReduceSideStreamingAfterFlatten(ExecType execType)
+ throws Exception {
+ PigServer pigServer = new PigServer(execType);
File input = Util.createInputFile("tmp", "",
new String[] {"A,1", "B,2", "C,3", "D,2",
@@ -130,7 +167,9 @@
pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by $0;");
pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach GROUPED_DATA " +
"generate flatten($1);");
- pigServer.registerQuery("OP = stream FLATTENED_GROUPED_DATA through `" +
+ pigServer.registerQuery("S1 = stream FLATTENED_GROUPED_DATA through `" +
+ simpleEchoStreamingCommand + "`;");
+ pigServer.registerQuery("OP = stream S1 through `" +
simpleEchoStreamingCommand + "`;");
// Run the query and check the results
@@ -138,8 +177,20 @@
}
@Test
- public void testSimpleOrderedReduceSideStreamingAfterFlatten() throws Exception {
- PigServer pigServer = new PigServer(MAPREDUCE);
+ public void testLocalSimpleOrderedReduceSideStreamingAfterFlatten()
+ throws Exception {
+ testSimpleOrderedReduceSideStreamingAfterFlatten(ExecType.LOCAL);
+ }
+
+ @Test
+ public void testMRSimpleOrderedReduceSideStreamingAfterFlatten()
+ throws Exception {
+ testSimpleOrderedReduceSideStreamingAfterFlatten(ExecType.MAPREDUCE);
+ }
+
+ private void testSimpleOrderedReduceSideStreamingAfterFlatten(
+ ExecType execType) throws Exception {
+ PigServer pigServer = new PigServer(execType);
File input = Util.createInputFile("tmp", "",
new String[] {"A,1,2,3", "B,2,4,5",
@@ -169,12 +220,18 @@
pigServer.registerQuery("IP = load 'file:" + input + "' using " +
PigStorage.class.getName() + "(',');");
pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+ pigServer.registerQuery("S1 = stream FILTERED_DATA through `" +
+ simpleEchoStreamingCommand + "`;");
+ pigServer.registerQuery("S2 = stream S1 through `" +
+ simpleEchoStreamingCommand + "`;");
pigServer.registerQuery("GROUPED_DATA = group IP by $0;");
pigServer.registerQuery("ORDERED_DATA = foreach GROUPED_DATA { " +
" D = order IP BY $2, $3;" +
" generate flatten(D);" +
"};");
- pigServer.registerQuery("OP = stream ORDERED_DATA through `" +
+ pigServer.registerQuery("S3 = stream ORDERED_DATA through `" +
+ simpleEchoStreamingCommand + "`;");
+ pigServer.registerQuery("OP = stream S3 through `" +
simpleEchoStreamingCommand + "`;");
// Run the query and check the results
@@ -224,6 +281,7 @@
pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
String output = "/pig/out";
+ pigServer.deleteFile(output);
pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
@@ -237,9 +295,6 @@
// Run the query and check the results
Util.checkQueryOutputs(outputs.iterator(), expectedResults);
-
- // Cleanup
- pigServer.deleteFile(output);
}
@Test
@@ -286,6 +341,7 @@
pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
String output = "/pig/out";
+ pigServer.deleteFile(output);
pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
@@ -299,9 +355,6 @@
// Run the query and check the results
Util.checkQueryOutputs(outputs.iterator(), expectedResults);
-
- // Cleanup
- pigServer.deleteFile(output);
}
@Test
@@ -351,6 +404,7 @@
pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
String output = "/pig/out";
+ pigServer.deleteFile(output);
pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
InputStream op = FileLocalizer.open(output, pigServer.getPigContext());