You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2013/09/17 21:14:28 UTC
svn commit: r1524181 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/builtin/ src/org/apache/pig/data/
src/org/apache/pig/impl/streaming/ src/org/apache/pig/impl/util/
Author: rohini
Date: Tue Sep 17 19:14:28 2013
New Revision: 1524181
URL: http://svn.apache.org/r1524181
Log:
PIG-3235: Avoid extra byte array copies in streaming (rohini)
Added:
pig/trunk/src/org/apache/pig/PigStreamingBase.java
pig/trunk/src/org/apache/pig/data/WritableByteArray.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/PigToStream.java
pig/trunk/src/org/apache/pig/StreamToPig.java
pig/trunk/src/org/apache/pig/builtin/PigStreaming.java
pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java
pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1524181&r1=1524180&r2=1524181&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Sep 17 19:14:28 2013
@@ -30,6 +30,8 @@ PIG-3174: Remove rpm and deb artifacts f
IMPROVEMENTS
+PIG-3235: Avoid extra byte array copies in streaming (rohini)
+
PIG-3065: pig output format/committer should support recovery for hadoop 0.23 (daijy)
PIG-3390: Make pig working with HBase 0.95 (jarcec via daijy)
Added: pig/trunk/src/org/apache/pig/PigStreamingBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigStreamingBase.java?rev=1524181&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/PigStreamingBase.java (added)
+++ pig/trunk/src/org/apache/pig/PigStreamingBase.java Tue Sep 17 19:14:28 2013
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.WritableByteArray;
+import org.apache.pig.data.Tuple;
+
+/**
+ * The interface is used for the custom mapping of
+ * - a {@link Tuple} to a byte array. The byte array is fed to the stdin
+ * of the streaming process.
+ * - a byte array, received from the stdout of the streaming process,
+ * to a {@link Tuple}.
+ *
+ * This interface is designed to provide a common protocol for data exchange
+ * between Pig runtime and streaming executables.
+ *
+ * Typically, a user implements this interface for a particular type of stream
+ * command and specifies the implementation class in the Pig DEFINE statement.
+ *
+ * @since Pig 0.12
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class PigStreamingBase implements PigToStream, StreamToPig {
+
+ @Override
+ @Deprecated
+ public final byte[] serialize(Tuple t) throws IOException {
+ WritableByteArray data = serializeToBytes(t);
+ if (data.getLength() == data.getData().length) {
+ return data.getData();
+ } else {
+ byte[] buf = new byte[data.getLength()];
+ System.arraycopy(data.getData(), 0, buf, 0, data.getLength());
+ return buf;
+ }
+ }
+
+ /**
+ * Given a tuple, produce an array of bytes to be passed to the streaming
+ * executable.
+ * @param t Tuple to serialize
+ * @return Serialized form of the tuple
+ * @throws IOException
+ */
+ public abstract WritableByteArray serializeToBytes(Tuple t) throws IOException;
+
+ @Override
+ @Deprecated
+ public final Tuple deserialize(byte[] bytes) throws IOException {
+ return deserialize(bytes, 0, bytes.length);
+ }
+
+ /**
+ * Given a byte array from a streaming executable, produce a tuple.
+ * @param bytes bytes to deserialize.
+ * @param offset the offset in the byte array from which to deserialize.
+ * @param length the number of bytes from the offset of the byte array to deserialize.
+ * @return Data as a Pig Tuple.
+ * @throws IOException
+ */
+ public abstract Tuple deserialize(byte[] bytes, int offset, int length) throws IOException;
+
+ /**
+ * This will be called on the front end during planning and not on the back
+ * end during execution.
+ *
+ * @return the {@link LoadCaster} associated with this object, or null if
+ * there is no such LoadCaster.
+ * @throws IOException if there is an exception during LoadCaster
+ */
+ @Override
+ public abstract LoadCaster getLoadCaster() throws IOException;
+
+}
Modified: pig/trunk/src/org/apache/pig/PigToStream.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigToStream.java?rev=1524181&r1=1524180&r2=1524181&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigToStream.java (original)
+++ pig/trunk/src/org/apache/pig/PigToStream.java Tue Sep 17 19:14:28 2013
@@ -26,20 +26,26 @@ import org.apache.pig.data.Tuple;
/**
* The interface used for the custom mapping of a {@link Tuple} to a byte
* array. The byte array is fed to the stdin of the streaming process.
- *
+ *
* This interface, together with {@link StreamToPig}, is designed to provide
- * a common protocol for data exchange between Pig runtime and streaming
+ * a common protocol for data exchange between Pig runtime and streaming
* executables.
- *
- * Typically, a user implements this interface for a particular type of
+ *
+ * Typically, a user implements this interface for a particular type of
* stream command and specifies the implementation class in the Pig DEFINE
- * statement.
- * @since Pig 0.7
+ * statement.
+ * @since Pig 0.7
*/
+
+/**
+ * @deprecated Use {@link org.apache.pig.PigStreamingBase}
+ */
+
+@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface PigToStream {
-
+
/**
* Given a tuple, produce an array of bytes to be passed to the streaming
* executable.
Modified: pig/trunk/src/org/apache/pig/StreamToPig.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StreamToPig.java?rev=1524181&r1=1524180&r2=1524181&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StreamToPig.java (original)
+++ pig/trunk/src/org/apache/pig/StreamToPig.java Tue Sep 17 19:14:28 2013
@@ -25,21 +25,27 @@ import org.apache.pig.data.Tuple;
/**
* The interface is used for the custom mapping of a byte array, received from
- * the stdout of the streaming process, to a {@link Tuple}.
- *
+ * the stdout of the streaming process, to a {@link Tuple}.
+ *
* This interface, together with {@link PigToStream}, is designed to provide
- * a common protocol for data exchange between Pig runtime and streaming
+ * a common protocol for data exchange between Pig runtime and streaming
* executables.
- *
- * The method <code>getLoadCaster</code> is called by Pig to convert the
+ *
+ * The method <code>getLoadCaster</code> is called by Pig to convert the
* fields in the byte array to typed fields of the Tuple based on a given
* schema.
- *
- * Typically, user implements this interface for a particular type of
+ *
+ * Typically, user implements this interface for a particular type of
* stream command and specifies the implementation class in the Pig DEFINE
- * statement.
+ * statement.
* @since Pig 0.7
*/
+
+/**
+ * @deprecated Use {@link org.apache.pig.PigStreamingBase}
+ */
+
+@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface StreamToPig {
@@ -51,12 +57,12 @@ public interface StreamToPig {
public Tuple deserialize(byte[] bytes) throws IOException;
/**
- * This will be called on the front end during planning and not on the back
+ * This will be called on the front end during planning and not on the back
* end during execution.
- *
+ *
* @return the {@link LoadCaster} associated with this object, or null if
- * there is no such LoadCaster.
- * @throws IOException if there is an exception during LoadCaster
+ * there is no such LoadCaster.
+ * @throws IOException if there is an exception during LoadCaster
*/
public LoadCaster getLoadCaster() throws IOException;
Modified: pig/trunk/src/org/apache/pig/builtin/PigStreaming.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStreaming.java?rev=1524181&r1=1524180&r2=1524181&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStreaming.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStreaming.java Tue Sep 17 19:14:28 2013
@@ -17,41 +17,39 @@
*/
package org.apache.pig.builtin;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import org.apache.hadoop.io.Text;
import org.apache.pig.LoadCaster;
-import org.apache.pig.PigToStream;
-import org.apache.pig.StreamToPig;
+import org.apache.pig.PigStreamingBase;
+import org.apache.pig.data.WritableByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.StorageUtil;
/**
- * The default implementation of {@link PigToStream} and {@link StreamToPig}
- * interfaces. It converts tuples into <code>fieldDel</code> separated lines
+ * The default implementation of {@link PigStreamingBase}.
+ * It converts tuples into <code>fieldDel</code> separated lines
* and <code>fieldDel</code> separated lines into tuples.
- *
+ *
*/
-public class PigStreaming implements PigToStream, StreamToPig {
+public class PigStreaming extends PigStreamingBase {
private byte recordDel = '\n';
-
+
private byte fieldDel = '\t';
-
- private ByteArrayOutputStream out;
-
+
+ private WritableByteArray out;
+
/**
* The constructor that uses the default field delimiter.
*/
public PigStreaming() {
- out = new ByteArrayOutputStream();
+ out = new WritableByteArray();
}
-
+
/**
* The constructor that accepts a user-specified field
* delimiter.
- *
+ *
* @param delimiter a <code>String</code> specifying the field
* delimiter.
*/
@@ -59,9 +57,9 @@ public class PigStreaming implements Pig
this();
fieldDel = StorageUtil.parseFieldDel(delimiter);
}
-
+
@Override
- public byte[] serialize(Tuple t) throws IOException {
+ public WritableByteArray serializeToBytes(Tuple t) throws IOException {
out.reset();
int sz = t.size();
for (int i=0; i<sz; i++) {
@@ -76,17 +74,16 @@ public class PigStreaming implements Pig
out.write(fieldDel);
}
}
- return out.toByteArray();
+ return out;
}
@Override
- public Tuple deserialize(byte[] bytes) throws IOException {
- Text val = new Text(bytes);
- return StorageUtil.textToTuple(val, fieldDel);
+ public Tuple deserialize(byte[] bytes, int offset, int length) throws IOException {
+ return StorageUtil.bytesToTuple(bytes, offset, length, fieldDel);
}
@Override
- public LoadCaster getLoadCaster() throws IOException {
+ public LoadCaster getLoadCaster() throws IOException {
return new Utf8StorageConverter();
}
Added: pig/trunk/src/org/apache/pig/data/WritableByteArray.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/WritableByteArray.java?rev=1524181&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/WritableByteArray.java (added)
+++ pig/trunk/src/org/apache/pig/data/WritableByteArray.java Tue Sep 17 19:14:28 2013
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.ByteArrayOutputStream;
+
+/** A reusable byte buffer implementation
+ *
+ * <p>This saves memory over creating a new byte[] and
+ * ByteArrayOutputStream each time data is written.
+ *
+ * <p>Typical usage is something like the following:<pre>
+ *
+ * WritableByteArray buffer = new WritableByteArray();
+ * while (... loop condition ...) {
+ * buffer.reset();
+ * ... write to buffer ...
+ * byte[] data = buffer.getData();
+ * int dataLength = buffer.getLength();
+ * ... write data to its ultimate destination ...
+ * }
+ * </pre>
+ *
+ */
+public class WritableByteArray extends ByteArrayOutputStream {
+
+ public WritableByteArray() {
+ super();
+ }
+
+ public WritableByteArray(int size) {
+ super(size);
+ }
+
+ public WritableByteArray(byte[] buf) {
+ this.buf = buf;
+ this.count = buf.length;
+ }
+
+ /**
+ * Returns the current contents of the buffer.
+ * Data is only valid to {@link #getLength()}.
+ * @return contents of the buffer
+ */
+ public byte[] getData() {
+ return buf;
+ }
+
+ /**
+ * Returns the length of the valid data currently in the buffer
+ * @return length of valid data
+ */
+ public int getLength() {
+ return count;
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java?rev=1524181&r1=1524180&r2=1524181&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java Tue Sep 17 19:14:28 2013
@@ -20,52 +20,61 @@ package org.apache.pig.impl.streaming;
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.pig.PigStreamingBase;
import org.apache.pig.PigToStream;
+import org.apache.pig.data.WritableByteArray;
import org.apache.pig.data.Tuple;
/**
- * {@link InputHandler} is responsible for handling the input to the
+ * {@link InputHandler} is responsible for handling the input to the
* Pig-Streaming external command.
- *
- * The managed executable could be fed input in a {@link InputType#SYNCHRONOUS}
- * manner via its <code>stdin</code> or in an {@link InputType#ASYNCHRONOUS}
+ *
+ * The managed executable could be fed input in a {@link InputType#SYNCHRONOUS}
+ * manner via its <code>stdin</code> or in an {@link InputType#ASYNCHRONOUS}
* manner via an external file which is subsequently read by the executable.
*/
public abstract class InputHandler {
/**
- *
+ *
*/
public enum InputType {SYNCHRONOUS, ASYNCHRONOUS}
-
+
/*
* The serializer to be used to send data to the managed process.
- *
+ *
* It is the responsibility of the concrete sub-classes to setup and
- * manage the serializer.
- */
+ * manage the serializer.
+ */
protected PigToStream serializer;
-
+
+ private PigStreamingBase newSerializer;
+
private OutputStream out;
-
+
// flag to mark if close() has already been called
protected boolean alreadyClosed = false;
-
+
/**
* Get the handled <code>InputType</code>
* @return the handled <code>InputType</code>
*/
public abstract InputType getInputType();
-
+
/**
* Send the given input <code>Tuple</code> to the managed executable.
- *
+ *
* @param t input <code>Tuple</code>
* @throws IOException
*/
public void putNext(Tuple t) throws IOException {
- out.write(serializer.serialize(t));
+ if (newSerializer != null) {
+ WritableByteArray buf = newSerializer.serializeToBytes(t);
+ out.write(buf.getData(), 0, buf.getLength());
+ } else {
+ out.write(serializer.serialize(t));
+ }
}
-
+
/**
* Close the <code>InputHandler</code> since there is no more input
* to be sent to the managed process.
@@ -75,7 +84,7 @@ public abstract class InputHandler {
* of the implementer to check that the process is usable. The managed process
* object is supplied by the ExecutableManager to this call so that this method
* can check if the process is alive if it needs to know.
- *
+ *
* @throws IOException
*/
public synchronized void close(Process process) throws IOException {
@@ -86,16 +95,19 @@ public abstract class InputHandler {
out = null;
}
}
-
+
/**
* Bind the <code>InputHandler</code> to the <code>OutputStream</code>
* from which it reads input and sends it to the managed process.
- *
+ *
* @param os <code>OutputStream</code> from which to read input data for the
* managed process
* @throws IOException
*/
public void bindTo(OutputStream os) throws IOException {
out = os;
+ if (this.serializer instanceof PigStreamingBase) {
+ this.newSerializer = (PigStreamingBase) serializer;
+ }
}
}
Modified: pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=1524181&r1=1524180&r2=1524181&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java Tue Sep 17 19:14:28 2013
@@ -21,16 +21,17 @@ import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.LineReader;
+import org.apache.pig.PigStreamingBase;
import org.apache.pig.StreamToPig;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
/**
- * {@link OutputHandler} is responsible for handling the output of the
+ * {@link OutputHandler} is responsible for handling the output of the
* Pig-Streaming external command.
- *
- * The output of the managed executable could be fetched in a
- * {@link OutputType#SYNCHRONOUS} manner via its <code>stdout</code> or in an
+ *
+ * The output of the managed executable could be fetched in a
+ * {@link OutputType#SYNCHRONOUS} manner via its <code>stdout</code> or in an
* {@link OutputType#ASYNCHRONOUS} manner via an external file to which the
* process wrote its output.
*/
@@ -39,30 +40,34 @@ public abstract class OutputHandler {
/*
* The deserializer to be used to send data to the managed process.
- *
+ *
* It is the responsibility of the concrete sub-classes to setup and
- * manage the deserializer.
- */
+ * manage the deserializer.
+ */
protected StreamToPig deserializer;
-
+
+ private PigStreamingBase newDeserializer;
+
protected LineReader in = null;
+ private Text currValue = new Text();
+
private BufferedPositionedInputStream istream;
-
+
/**
* Get the handled <code>OutputType</code>.
- * @return the handled <code>OutputType</code>
+ * @return the handled <code>OutputType</code>
*/
public abstract OutputType getOutputType();
-
+
// flag to mark if close() has already been called
protected boolean alreadyClosed = false;
-
+
/**
* Bind the <code>OutputHandler</code> to the <code>InputStream</code>
* from which to read the output data of the managed process.
- *
- * @param is <code>InputStream</code> from which to read the output data
+ *
+ * @param is <code>InputStream</code> from which to read the output data
* of the managed process
* @throws IOException
*/
@@ -70,11 +75,14 @@ public abstract class OutputHandler {
long offset, long end) throws IOException {
this.istream = is;
this.in = new LineReader(istream);
+ if (this.deserializer instanceof PigStreamingBase) {
+ this.newDeserializer = (PigStreamingBase) deserializer;
+ }
}
-
+
/**
* Get the next output <code>Tuple</code> of the managed process.
- *
+ *
* @return the next output <code>Tuple</code> of the managed process
* @throws IOException
*/
@@ -83,17 +91,21 @@ public abstract class OutputHandler {
return null;
}
- Text value = new Text();
- int num = in.readLine(value);
+ currValue.clear();
+ int num = in.readLine(currValue);
if (num <= 0) {
return null;
}
-
- byte[] newBytes = new byte[value.getLength()];
- System.arraycopy(value.getBytes(), 0, newBytes, 0, value.getLength());
- return deserializer.deserialize(newBytes);
+
+ if (newDeserializer != null) {
+ return newDeserializer.deserialize(currValue.getBytes(), 0, currValue.getLength());
+ } else {
+ byte[] newBytes = new byte[currValue.getLength()];
+ System.arraycopy(currValue.getBytes(), 0, newBytes, 0, currValue.getLength());
+ return deserializer.deserialize(newBytes);
+ }
}
-
+
/**
* Close the <code>OutputHandler</code>.
* @throws IOException
Modified: pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java?rev=1524181&r1=1524180&r2=1524181&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java Tue Sep 17 19:14:28 2013
@@ -229,14 +229,24 @@ public final class StorageUtil {
* @return tuple constructed from the text
*/
public static Tuple textToTuple(Text val, byte fieldDel) {
+ return bytesToTuple(val.getBytes(), 0, val.getLength(), fieldDel);
+ }
+
+ /**
+ * Transform bytes from a byte array up to the specified length to a <code>Tuple</code>
+ *
+ * @param buf the byte array
+ * @param length number of bytes to consume from the byte array
+ * @param fieldDel the field delimiter
+ * @return tuple constructed from the bytes
+ */
+ public static Tuple bytesToTuple(byte[] buf, int offset, int length, byte fieldDel) {
- byte[] buf = val.getBytes();
- int len = val.getLength();
- int start = 0;
+ int start = offset;
ArrayList<Object> protoTuple = new ArrayList<Object>();
- for (int i = 0; i < len; i++) {
+ for (int i = offset; i < length; i++) {
if (buf[i] == fieldDel) {
readField(protoTuple, buf, start, i);
start = i + 1;
@@ -244,8 +254,8 @@ public final class StorageUtil {
}
// pick up the last field
- if (start <= len) {
- readField(protoTuple, buf, start, len);
+ if (start <= length) {
+ readField(protoTuple, buf, start, length);
}
return TupleFactory.getInstance().newTupleNoCopy(protoTuple);