You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2013/09/17 21:14:28 UTC

svn commit: r1524181 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/streaming/ src/org/apache/pig/impl/util/

Author: rohini
Date: Tue Sep 17 19:14:28 2013
New Revision: 1524181

URL: http://svn.apache.org/r1524181
Log:
PIG-3235: Avoid extra byte array copies in streaming (rohini)

Added:
    pig/trunk/src/org/apache/pig/PigStreamingBase.java
    pig/trunk/src/org/apache/pig/data/WritableByteArray.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigToStream.java
    pig/trunk/src/org/apache/pig/StreamToPig.java
    pig/trunk/src/org/apache/pig/builtin/PigStreaming.java
    pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java
    pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
    pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1524181&r1=1524180&r2=1524181&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Sep 17 19:14:28 2013
@@ -30,6 +30,8 @@ PIG-3174: Remove rpm and deb artifacts f
 
 IMPROVEMENTS
 
+PIG-3235: Avoid extra byte array copies in streaming (rohini)
+
 PIG-3065: pig output format/committer should support recovery for hadoop 0.23 (daijy)
 
 PIG-3390: Make pig working with HBase 0.95 (jarcec via daijy)

Added: pig/trunk/src/org/apache/pig/PigStreamingBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigStreamingBase.java?rev=1524181&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/PigStreamingBase.java (added)
+++ pig/trunk/src/org/apache/pig/PigStreamingBase.java Tue Sep 17 19:14:28 2013
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.WritableByteArray;
+import org.apache.pig.data.Tuple;
+
+/**
+ * The interface is used for the custom mapping of
+ *    - a {@link Tuple} to a byte array. The byte array is fed to the stdin
+ *      of the streaming process.
+ *    - a byte array, received from the stdout of the streaming process,
+ *      to a {@link Tuple}.
+ *
+ * This interface is designed to provide a common protocol for data exchange
+ * between Pig runtime and streaming executables.
+ *
+ * Typically, a user implements this interface for a particular type of stream
+ * command and specifies the implementation class in the Pig DEFINE statement.
+ *
+ * @since Pig 0.12
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class PigStreamingBase implements PigToStream, StreamToPig {
+
+    @Override
+    @Deprecated
+    public final byte[] serialize(Tuple t) throws IOException {
+        WritableByteArray data = serializeToBytes(t);
+        if (data.getLength() == data.getData().length) {
+            return data.getData();
+        } else {
+            byte[] buf = new byte[data.getLength()];
+            System.arraycopy(data.getData(), 0, buf, 0, data.getLength());
+            return buf;
+        }
+    }
+
+    /**
+     * Given a tuple, produce an array of bytes to be passed to the streaming
+     * executable.
+     * @param t Tuple to serialize
+     * @return Serialized form of the tuple
+     * @throws IOException
+     */
+    public abstract WritableByteArray serializeToBytes(Tuple t) throws IOException;
+
+    @Override
+    @Deprecated
+    public final Tuple deserialize(byte[] bytes) throws IOException {
+        return deserialize(bytes, 0, bytes.length);
+    }
+
+    /**
+     * Given a byte array from a streaming executable, produce a tuple.
+     * @param bytes  bytes to deserialize.
+     * @param offset the offset in the byte array from which to deserialize.
+     * @param length the number of bytes from the offset of the byte array to deserialize.
+     * @return Data as a Pig Tuple.
+     * @throws IOException
+     */
+    public abstract Tuple deserialize(byte[] bytes, int offset, int length) throws IOException;
+
+    /**
+     * This will be called on the front end during planning and not on the back
+     * end during execution.
+     *
+     * @return the {@link LoadCaster} associated with this object, or null if
+     * there is no such LoadCaster.
+     * @throws IOException if there is an exception during LoadCaster
+     */
+    @Override
+    public abstract LoadCaster getLoadCaster() throws IOException;
+
+}

Modified: pig/trunk/src/org/apache/pig/PigToStream.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigToStream.java?rev=1524181&r1=1524180&r2=1524181&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigToStream.java (original)
+++ pig/trunk/src/org/apache/pig/PigToStream.java Tue Sep 17 19:14:28 2013
@@ -26,20 +26,26 @@ import org.apache.pig.data.Tuple;
 /**
  * The interface used for the custom mapping of a {@link Tuple} to a byte
  * array. The byte array is fed to the stdin of the streaming process.
- * 
+ *
  * This interface, together with {@link StreamToPig}, is designed to provide
- * a common protocol for data exchange between Pig runtime and streaming 
+ * a common protocol for data exchange between Pig runtime and streaming
  * executables.
- * 
- * Typically, a user implements this interface for a particular type of 
+ *
+ * Typically, a user implements this interface for a particular type of
  * stream command and specifies the implementation class in the Pig DEFINE
- * statement. 
- * @since Pig 0.7 
+ * statement.
+ * @since Pig 0.7
  */
+
+/**
+ * @deprecated Use {@link org.apache.pig.PigStreamingBase}
+ */
+
+@Deprecated
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public interface PigToStream {
-    
+
     /**
      * Given a tuple, produce an array of bytes to be passed to the streaming
      * executable.

Modified: pig/trunk/src/org/apache/pig/StreamToPig.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StreamToPig.java?rev=1524181&r1=1524180&r2=1524181&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StreamToPig.java (original)
+++ pig/trunk/src/org/apache/pig/StreamToPig.java Tue Sep 17 19:14:28 2013
@@ -25,21 +25,27 @@ import org.apache.pig.data.Tuple;
 
 /**
  * The interface is used for the custom mapping of a byte array, received from
- * the stdout of the streaming process, to a {@link Tuple}. 
- * 
+ * the stdout of the streaming process, to a {@link Tuple}.
+ *
  * This interface, together with {@link PigToStream}, is designed to provide
- * a common protocol for data exchange between Pig runtime and streaming 
+ * a common protocol for data exchange between Pig runtime and streaming
  * executables.
- * 
- * The method <code>getLoadCaster</code> is called by Pig to convert the 
+ *
+ * The method <code>getLoadCaster</code> is called by Pig to convert the
  * fields in the byte array to typed fields of the Tuple based on a given
  * schema.
- * 
- * Typically, user implements this interface for a particular type of 
+ *
+ * Typically, user implements this interface for a particular type of
  * stream command and specifies the implementation class in the Pig DEFINE
- * statement. 
+ * statement.
  * @since Pig 0.7
  */
+
+/**
+ * @deprecated Use {@link org.apache.pig.PigStreamingBase}
+ */
+
+@Deprecated
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public interface StreamToPig {
@@ -51,12 +57,12 @@ public interface StreamToPig {
     public Tuple deserialize(byte[] bytes) throws IOException;
 
     /**
-     * This will be called on the front end during planning and not on the back 
+     * This will be called on the front end during planning and not on the back
      * end during execution.
-     * 
+     *
      * @return the {@link LoadCaster} associated with this object, or null if
-     * there is no such LoadCaster. 
-     * @throws IOException if there is an exception during LoadCaster 
+     * there is no such LoadCaster.
+     * @throws IOException if there is an exception during LoadCaster
      */
     public LoadCaster getLoadCaster() throws IOException;
 

Modified: pig/trunk/src/org/apache/pig/builtin/PigStreaming.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStreaming.java?rev=1524181&r1=1524180&r2=1524181&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStreaming.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStreaming.java Tue Sep 17 19:14:28 2013
@@ -17,41 +17,39 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
-import org.apache.hadoop.io.Text;
 import org.apache.pig.LoadCaster;
-import org.apache.pig.PigToStream;
-import org.apache.pig.StreamToPig;
+import org.apache.pig.PigStreamingBase;
+import org.apache.pig.data.WritableByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.StorageUtil;
 
 /**
- * The default implementation of {@link PigToStream} and {@link StreamToPig}
- * interfaces. It converts tuples into <code>fieldDel</code> separated lines 
+ * The default implementation of {@link PigStreamingBase}.
+ * It converts tuples into <code>fieldDel</code> separated lines
  * and <code>fieldDel</code> separated lines into tuples.
- * 
+ *
  */
-public class PigStreaming implements PigToStream, StreamToPig {
+public class PigStreaming extends PigStreamingBase {
 
     private byte recordDel = '\n';
-    
+
     private byte fieldDel = '\t';
-    
-    private ByteArrayOutputStream out;
-    
+
+    private WritableByteArray out;
+
     /**
      * The constructor that uses the default field delimiter.
      */
     public PigStreaming() {
-        out = new ByteArrayOutputStream();
+        out = new WritableByteArray();
     }
-    
+
     /**
      * The constructor that accepts a user-specified field
      * delimiter.
-     * 
+     *
      * @param delimiter a <code>String</code> specifying the field
      * delimiter.
      */
@@ -59,9 +57,9 @@ public class PigStreaming implements Pig
         this();
         fieldDel = StorageUtil.parseFieldDel(delimiter);
     }
-    
+
     @Override
-    public byte[] serialize(Tuple t) throws IOException {
+    public WritableByteArray serializeToBytes(Tuple t) throws IOException {
         out.reset();
         int sz = t.size();
         for (int i=0; i<sz; i++) {
@@ -76,17 +74,16 @@ public class PigStreaming implements Pig
                 out.write(fieldDel);
             }
         }
-        return out.toByteArray();
+        return out;
     }
 
     @Override
-    public Tuple deserialize(byte[] bytes) throws IOException {
-        Text val = new Text(bytes);
-        return StorageUtil.textToTuple(val, fieldDel);
+    public Tuple deserialize(byte[] bytes, int offset, int length) throws IOException {
+        return StorageUtil.bytesToTuple(bytes, offset, length, fieldDel);
     }
 
     @Override
-    public LoadCaster getLoadCaster() throws IOException {        
+    public LoadCaster getLoadCaster() throws IOException {
         return new Utf8StorageConverter();
     }
 

Added: pig/trunk/src/org/apache/pig/data/WritableByteArray.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/WritableByteArray.java?rev=1524181&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/WritableByteArray.java (added)
+++ pig/trunk/src/org/apache/pig/data/WritableByteArray.java Tue Sep 17 19:14:28 2013
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.ByteArrayOutputStream;
+
+/** A reusable byte buffer implementation
+ *
+ * <p>This saves memory over creating a new byte[] and
+ * ByteArrayOutputStream each time data is written.
+ *
+ * <p>Typical usage is something like the following:<pre>
+ *
+ * WritableByteArray buffer = new WritableByteArray();
+ * while (... loop condition ...) {
+ *   buffer.reset();
+ *   ... write to buffer ...
+ *   byte[] data = buffer.getData();
+ *   int dataLength = buffer.getLength();
+ *   ... write data to its ultimate destination ...
+ * }
+ * </pre>
+ *
+ */
+public class WritableByteArray extends ByteArrayOutputStream {
+
+    public WritableByteArray() {
+        super();
+    }
+
+    public WritableByteArray(int size) {
+        super(size);
+    }
+
+    public WritableByteArray(byte[] buf) {
+        this.buf = buf;
+        this.count = buf.length;
+    }
+
+    /**
+     * Returns the current contents of the buffer.
+     * Data is only valid to {@link #getLength()}.
+     * @return contents of the buffer
+     */
+    public byte[] getData() {
+        return buf;
+    }
+
+    /**
+     * Returns the length of the valid data currently in the buffer
+     * @return length of valid data
+     */
+    public int getLength() {
+        return count;
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java?rev=1524181&r1=1524180&r2=1524181&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java Tue Sep 17 19:14:28 2013
@@ -20,52 +20,61 @@ package org.apache.pig.impl.streaming;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.pig.PigStreamingBase;
 import org.apache.pig.PigToStream;
+import org.apache.pig.data.WritableByteArray;
 import org.apache.pig.data.Tuple;
 
 /**
- * {@link InputHandler} is responsible for handling the input to the 
+ * {@link InputHandler} is responsible for handling the input to the
  * Pig-Streaming external command.
- * 
- * The managed executable could be fed input in a {@link InputType#SYNCHRONOUS} 
- * manner via its <code>stdin</code> or in an {@link InputType#ASYNCHRONOUS} 
+ *
+ * The managed executable could be fed input in a {@link InputType#SYNCHRONOUS}
+ * manner via its <code>stdin</code> or in an {@link InputType#ASYNCHRONOUS}
  * manner via an external file which is subsequently read by the executable.
  */
 public abstract class InputHandler {
     /**
-     * 
+     *
      */
     public enum InputType {SYNCHRONOUS, ASYNCHRONOUS}
-   
+
     /*
      * The serializer to be used to send data to the managed process.
-     * 
+     *
      * It is the responsibility of the concrete sub-classes to setup and
-     * manage the serializer. 
-     */  
+     * manage the serializer.
+     */
     protected PigToStream serializer;
-    
+
+    private PigStreamingBase newSerializer;
+
     private OutputStream out;
-    
+
     // flag to mark if close() has already been called
     protected boolean alreadyClosed = false;
-    
+
     /**
      * Get the handled <code>InputType</code>
      * @return the handled <code>InputType</code>
      */
     public abstract InputType getInputType();
-    
+
     /**
      * Send the given input <code>Tuple</code> to the managed executable.
-     * 
+     *
      * @param t input <code>Tuple</code>
      * @throws IOException
      */
     public void putNext(Tuple t) throws IOException {
-        out.write(serializer.serialize(t));
+        if (newSerializer != null) {
+            WritableByteArray buf = newSerializer.serializeToBytes(t);
+            out.write(buf.getData(), 0, buf.getLength());
+        } else {
+            out.write(serializer.serialize(t));
+        }
     }
-    
+
     /**
      * Close the <code>InputHandler</code> since there is no more input
      * to be sent to the managed process.
@@ -75,7 +84,7 @@ public abstract class InputHandler {
      * of the implementer to check that the process is usable. The managed process
      * object is supplied by the ExecutableManager to this call so that this method
      * can check if the process is alive if it needs to know.
-     * 
+     *
      * @throws IOException
      */
     public synchronized void close(Process process) throws IOException {
@@ -86,16 +95,19 @@ public abstract class InputHandler {
             out = null;
         }
     }
-    
+
     /**
      * Bind the <code>InputHandler</code> to the <code>OutputStream</code>
      * from which it reads input and sends it to the managed process.
-     * 
+     *
      * @param os <code>OutputStream</code> from which to read input data for the
      *           managed process
      * @throws IOException
      */
     public void bindTo(OutputStream os) throws IOException {
         out = os;
+        if (this.serializer instanceof PigStreamingBase) {
+            this.newSerializer = (PigStreamingBase) serializer;
+        }
     }
 }

Modified: pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=1524181&r1=1524180&r2=1524181&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java Tue Sep 17 19:14:28 2013
@@ -21,16 +21,17 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.LineReader;
+import org.apache.pig.PigStreamingBase;
 import org.apache.pig.StreamToPig;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 
 /**
- * {@link OutputHandler} is responsible for handling the output of the 
+ * {@link OutputHandler} is responsible for handling the output of the
  * Pig-Streaming external command.
- * 
- * The output of the managed executable could be fetched in a 
- * {@link OutputType#SYNCHRONOUS} manner via its <code>stdout</code> or in an 
+ *
+ * The output of the managed executable could be fetched in a
+ * {@link OutputType#SYNCHRONOUS} manner via its <code>stdout</code> or in an
  * {@link OutputType#ASYNCHRONOUS} manner via an external file to which the
  * process wrote its output.
  */
@@ -39,30 +40,34 @@ public abstract class OutputHandler {
 
     /*
      * The deserializer to be used to send data to the managed process.
-     * 
+     *
      * It is the responsibility of the concrete sub-classes to setup and
-     * manage the deserializer. 
-     */  
+     * manage the deserializer.
+     */
     protected StreamToPig deserializer;
-    
+
+    private PigStreamingBase newDeserializer;
+
     protected LineReader in = null;
 
+    private Text currValue = new Text();
+
     private BufferedPositionedInputStream istream;
-    
+
     /**
      * Get the handled <code>OutputType</code>.
-     * @return the handled <code>OutputType</code> 
+     * @return the handled <code>OutputType</code>
      */
     public abstract OutputType getOutputType();
-    
+
     // flag to mark if close() has already been called
     protected boolean alreadyClosed = false;
-    
+
     /**
      * Bind the <code>OutputHandler</code> to the <code>InputStream</code>
      * from which to read the output data of the managed process.
-     * 
-     * @param is <code>InputStream</code> from which to read the output data 
+     *
+     * @param is <code>InputStream</code> from which to read the output data
      *           of the managed process
      * @throws IOException
      */
@@ -70,11 +75,14 @@ public abstract class OutputHandler {
                        long offset, long end) throws IOException {
         this.istream  = is;
         this.in = new LineReader(istream);
+        if (this.deserializer instanceof PigStreamingBase) {
+            this.newDeserializer = (PigStreamingBase) deserializer;
+        }
     }
-    
+
     /**
      * Get the next output <code>Tuple</code> of the managed process.
-     * 
+     *
      * @return the next output <code>Tuple</code> of the managed process
      * @throws IOException
      */
@@ -83,17 +91,21 @@ public abstract class OutputHandler {
             return null;
         }
 
-        Text value = new Text();
-        int num = in.readLine(value);
+        currValue.clear();
+        int num = in.readLine(currValue);
         if (num <= 0) {
             return null;
         }
-        
-        byte[] newBytes = new byte[value.getLength()];
-        System.arraycopy(value.getBytes(), 0, newBytes, 0, value.getLength());
-        return deserializer.deserialize(newBytes);
+
+        if (newDeserializer != null) {
+            return newDeserializer.deserialize(currValue.getBytes(), 0, currValue.getLength());
+        } else {
+            byte[] newBytes = new byte[currValue.getLength()];
+            System.arraycopy(currValue.getBytes(), 0, newBytes, 0, currValue.getLength());
+            return deserializer.deserialize(newBytes);
+        }
     }
-    
+
     /**
      * Close the <code>OutputHandler</code>.
      * @throws IOException

Modified: pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java?rev=1524181&r1=1524180&r2=1524181&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java Tue Sep 17 19:14:28 2013
@@ -229,14 +229,24 @@ public final class StorageUtil {
      * @return tuple constructed from the text
      */
     public static Tuple textToTuple(Text val, byte fieldDel) {
+        return bytesToTuple(val.getBytes(), 0, val.getLength(), fieldDel);
+    }
+
+    /**
+     * Transform bytes from a byte array up to the specified length to a <code>Tuple</code>
+     *
+     * @param buf the byte array
+     * @param length number of bytes to consume from the byte array
+     * @param fieldDel the field delimiter
+     * @return tuple constructed from the bytes
+     */
+    public static Tuple bytesToTuple(byte[] buf, int offset, int length, byte fieldDel) {
 
-        byte[] buf = val.getBytes();
-        int len = val.getLength();
-        int start = 0;
+        int start = offset;
 
         ArrayList<Object> protoTuple = new ArrayList<Object>();
 
-        for (int i = 0; i < len; i++) {
+        for (int i = offset; i < length; i++) {
             if (buf[i] == fieldDel) {
                 readField(protoTuple, buf, start, i);
                 start = i + 1;
@@ -244,8 +254,8 @@ public final class StorageUtil {
         }
 
         // pick up the last field
-        if (start <= len) {
-            readField(protoTuple, buf, start, len);
+        if (start <= length) {
+            readField(protoTuple, buf, start, length);
         }
 
         return TupleFactory.getInstance().newTupleNoCopy(protoTuple);