You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2016/04/05 00:37:47 UTC

[03/24] hive git commit: HIVE-13255: FloatTreeReader.nextVector is expensive (Prasanth Jayachandran reviewed by Gopal V)

HIVE-13255: FloatTreeReader.nextVector is expensive (Prasanth Jayachandran reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8225cb6a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8225cb6a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8225cb6a

Branch: refs/heads/llap
Commit: 8225cb6aedba7e49515da44f092405994f9a22b6
Parents: 4008845
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Thu Mar 31 02:48:01 2016 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Thu Mar 31 02:48:01 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/orc/impl/IntegerReader.java |  4 +-
 .../apache/orc/impl/RunLengthIntegerReader.java |  7 +--
 .../orc/impl/RunLengthIntegerReaderV2.java      |  7 +--
 .../org/apache/orc/impl/SerializationUtils.java | 34 ++++++++++-----
 .../apache/orc/impl/TestSerializationUtils.java | 45 +++++++++++++++++--
 .../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 16 +++----
 .../hive/ql/io/orc/TreeReaderFactory.java       | 46 ++++++++++----------
 7 files changed, 99 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8225cb6a/orc/src/java/org/apache/orc/impl/IntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java
index b928559..7dfd289 100644
--- a/orc/src/java/org/apache/orc/impl/IntegerReader.java
+++ b/orc/src/java/org/apache/orc/impl/IntegerReader.java
@@ -60,8 +60,6 @@ public interface IntegerReader {
    * @return
    * @throws IOException
    */
-   void nextVector(LongColumnVector previous, long previousLen)
+   void nextVector(LongColumnVector previous, final int previousLen)
       throws IOException;
-
-  void setInStream(InStream data);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8225cb6a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
index f129c86..0c90cde 100644
--- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
@@ -99,7 +99,7 @@ public class RunLengthIntegerReader implements IntegerReader {
   }
 
   @Override
-  public void nextVector(LongColumnVector previous, long previousLen) throws IOException {
+  public void nextVector(LongColumnVector previous, final int previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
       if (!previous.isNull[i]) {
@@ -122,11 +122,6 @@ public class RunLengthIntegerReader implements IntegerReader {
   }
 
   @Override
-  public void setInStream(InStream data) {
-    input = data;
-  }
-
-  @Override
   public void seek(PositionProvider index) throws IOException {
     input.seek(index);
     int consumed = (int) index.getNext();

http://git-wip-us.apache.org/repos/asf/hive/blob/8225cb6a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
index 5f2a673..c6d685a 100644
--- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
@@ -360,7 +360,7 @@ public class RunLengthIntegerReaderV2 implements IntegerReader {
   }
 
   @Override
-  public void nextVector(LongColumnVector previous, long previousLen) throws IOException {
+  public void nextVector(LongColumnVector previous, final int previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
       if (!previous.isNull[i]) {
@@ -382,9 +382,4 @@ public class RunLengthIntegerReaderV2 implements IntegerReader {
       }
     }
   }
-
-  @Override
-  public void setInStream(InStream data) {
-    input = data;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8225cb6a/orc/src/java/org/apache/orc/impl/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/SerializationUtils.java b/orc/src/java/org/apache/orc/impl/SerializationUtils.java
index c1162e4..2e5a59b 100644
--- a/orc/src/java/org/apache/orc/impl/SerializationUtils.java
+++ b/orc/src/java/org/apache/orc/impl/SerializationUtils.java
@@ -18,8 +18,6 @@
 
 package org.apache.orc.impl;
 
-import org.apache.orc.impl.InStream;
-
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -77,18 +75,22 @@ public final class SerializationUtils {
   }
 
   public float readFloat(InputStream in) throws IOException {
-    int ser = in.read() | (in.read() << 8) | (in.read() << 16) |
-      (in.read() << 24);
-    return Float.intBitsToFloat(ser);
+    readFully(in, readBuffer, 0, 4);
+    int val = (((readBuffer[0] & 0xff) << 0)
+        + ((readBuffer[1] & 0xff) << 8)
+        + ((readBuffer[2] & 0xff) << 16)
+        + ((readBuffer[3] & 0xff) << 24));
+    return Float.intBitsToFloat(val);
   }
 
   public void writeFloat(OutputStream output,
                          float value) throws IOException {
     int ser = Float.floatToIntBits(value);
-    output.write(ser & 0xff);
-    output.write((ser >> 8) & 0xff);
-    output.write((ser >> 16) & 0xff);
-    output.write((ser >> 24) & 0xff);
+    writeBuffer[0] = (byte) ((ser >> 0)  & 0xff);
+    writeBuffer[1] = (byte) ((ser >> 8)  & 0xff);
+    writeBuffer[2] = (byte) ((ser >> 16) & 0xff);
+    writeBuffer[3] = (byte) ((ser >> 24) & 0xff);
+    output.write(writeBuffer, 0, 4);
   }
 
   public double readDouble(InputStream in) throws IOException {
@@ -96,7 +98,7 @@ public final class SerializationUtils {
   }
 
   public long readLongLE(InputStream in) throws IOException {
-    in.read(readBuffer, 0, 8);
+    readFully(in, readBuffer, 0, 8);
     return (((readBuffer[0] & 0xff) << 0)
         + ((readBuffer[1] & 0xff) << 8)
         + ((readBuffer[2] & 0xff) << 16)
@@ -107,6 +109,18 @@ public final class SerializationUtils {
         + ((long) (readBuffer[7] & 0xff) << 56));
   }
 
+  private void readFully(final InputStream in, final byte[] buffer, final int off, final int len)
+      throws IOException {
+    int n = 0;
+    while (n < len) {
+      int count = in.read(buffer, off + n, len - n);
+      if (count < 0) {
+        throw new EOFException("Read past EOF for " + in);
+      }
+      n += count;
+    }
+  }
+
   public void writeDouble(OutputStream output,
                           double value) throws IOException {
     writeLongLE(output, Double.doubleToLongBits(value));

http://git-wip-us.apache.org/repos/asf/hive/blob/8225cb6a/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java b/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java
index 0785412..4a8a0f2 100644
--- a/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java
+++ b/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java
@@ -25,6 +25,9 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
 import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 
 import org.junit.Test;
 
@@ -156,9 +159,43 @@ public class TestSerializationUtils {
     assertEquals(Long.MIN_VALUE, LongMath.checkedSubtract(Long.MIN_VALUE, 0));
   }
 
-  public static void main(String[] args) throws Exception {
-    TestSerializationUtils test = new TestSerializationUtils();
-    test.testDoubles();
-    test.testBigIntegers();
+  @Test
+  public void testRandomFloats() throws Exception {
+    float tolerance = 0.0000000000000001f;
+    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+    SerializationUtils utils = new SerializationUtils();
+    Random rand = new Random();
+    int n = 100_000;
+    float[] expected = new float[n];
+    for (int i = 0; i < n; i++) {
+      float f = rand.nextFloat();
+      expected[i] = f;
+      utils.writeFloat(buffer, f);
+    }
+    InputStream newBuffer = fromBuffer(buffer);
+    for (int i = 0; i < n; i++) {
+      float got = utils.readFloat(newBuffer);
+      assertEquals(expected[i], got, tolerance);
+    }
+  }
+
+  @Test
+  public void testRandomDoubles() throws Exception {
+    double tolerance = 0.0000000000000001;
+    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+    SerializationUtils utils = new SerializationUtils();
+    Random rand = new Random();
+    int n = 100_000;
+    double[] expected = new double[n];
+    for (int i = 0; i < n; i++) {
+      double d = rand.nextDouble();
+      expected[i] = d;
+      utils.writeDouble(buffer, d);
+    }
+    InputStream newBuffer = fromBuffer(buffer);
+    for (int i = 0; i < n; i++) {
+      double got = utils.readDouble(newBuffer);
+      assertEquals(expected[i], got, tolerance);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8225cb6a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index aa835ae..3975409 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -1060,7 +1060,7 @@ public class RecordReaderImpl implements RecordReader {
         readStripe();
       }
 
-      long batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE);
+      final int batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE);
 
       rowInStripe += batchSize;
       if (previous == null) {
@@ -1068,13 +1068,13 @@ public class RecordReaderImpl implements RecordReader {
         result = new VectorizedRowBatch(cols.length);
         result.cols = cols;
       } else {
-        result = (VectorizedRowBatch) previous;
+        result = previous;
         result.selectedInUse = false;
         reader.setVectorColumnCount(result.getDataColumnCount());
-        reader.nextVector(result.cols, (int) batchSize);
+        reader.nextVector(result.cols, batchSize);
       }
 
-      result.size = (int) batchSize;
+      result.size = batchSize;
       advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
       return result;
     } catch (IOException e) {
@@ -1083,8 +1083,8 @@ public class RecordReaderImpl implements RecordReader {
     }
   }
 
-  private long computeBatchSize(long targetBatchSize) {
-    long batchSize = 0;
+  private int computeBatchSize(long targetBatchSize) {
+    final int batchSize;
     // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
     // groups are selected then marker position is set to the end of range (subset of row groups
     // within strip). Batch size computed out of marker position makes sure that batch size is
@@ -1106,13 +1106,13 @@ public class RecordReaderImpl implements RecordReader {
       final long markerPosition =
           (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride)
               : rowCountInStripe;
-      batchSize = Math.min(targetBatchSize, (markerPosition - rowInStripe));
+      batchSize = (int) Math.min(targetBatchSize, (markerPosition - rowInStripe));
 
       if (isLogDebugEnabled && batchSize < targetBatchSize) {
         LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize);
       }
     } else {
-      batchSize = Math.min(targetBatchSize, (rowCountInStripe - rowInStripe));
+      batchSize = (int) Math.min(targetBatchSize, (rowCountInStripe - rowInStripe));
     }
     return batchSize;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/8225cb6a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
index 620ad53..d74a854 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
@@ -239,7 +239,7 @@ public class TreeReaderFactory {
      * @return next column vector
      * @throws IOException
      */
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       ColumnVector result = (ColumnVector) previousVector;
       if (present != null) {
         // Set noNulls and isNull vector of the ColumnVector based on
@@ -322,7 +322,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       final LongColumnVector result;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -387,7 +387,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       final LongColumnVector result;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -473,7 +473,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       final LongColumnVector result;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -559,7 +559,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       final LongColumnVector result;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -646,7 +646,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       final LongColumnVector result;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -719,7 +719,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       final DoubleColumnVector result;
       if (previousVector == null) {
         result = new DoubleColumnVector();
@@ -832,7 +832,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       final DoubleColumnVector result;
       if (previousVector == null) {
         result = new DoubleColumnVector();
@@ -974,7 +974,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       final BytesColumnVector result;
       if (previousVector == null) {
         result = new BytesColumnVector();
@@ -1144,7 +1144,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       final TimestampColumnVector result;
       if (previousVector == null) {
         result = new TimestampColumnVector();
@@ -1253,7 +1253,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       final LongColumnVector result;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -1352,7 +1352,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       final DecimalColumnVector result;
       if (previousVector == null) {
         result = new DecimalColumnVector(precision, scale);
@@ -1481,7 +1481,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       return reader.nextVector(previousVector, batchSize);
     }
 
@@ -1498,7 +1498,7 @@ public class TreeReaderFactory {
 
     private static byte[] commonReadByteArrays(InStream stream, IntegerReader lengths,
         LongColumnVector scratchlcv,
-        BytesColumnVector result, long batchSize) throws IOException {
+        BytesColumnVector result, final int batchSize) throws IOException {
       // Read lengths
       scratchlcv.isNull = result.isNull;  // Notice we are replacing the isNull vector here...
       lengths.nextVector(scratchlcv, batchSize);
@@ -1534,7 +1534,7 @@ public class TreeReaderFactory {
     // This method has the common code for reading in bytes into a BytesColumnVector.
     public static void readOrcByteArrays(InStream stream, IntegerReader lengths,
         LongColumnVector scratchlcv,
-        BytesColumnVector result, long batchSize) throws IOException {
+        BytesColumnVector result, final int batchSize) throws IOException {
 
       byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv, result, batchSize);
 
@@ -1641,7 +1641,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       final BytesColumnVector result;
       if (previousVector == null) {
         result = new BytesColumnVector();
@@ -1815,7 +1815,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       final BytesColumnVector result;
       int offset;
       int length;
@@ -1926,7 +1926,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       // Get the vector of strings from StringTreeReader, then make a 2nd pass to
       // adjust down the length (right trim and truncate) if necessary.
       BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize);
@@ -2000,7 +2000,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       // Get the vector of strings from StringTreeReader, then make a 2nd pass to
       // adjust down the length (truncate) if necessary.
       BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize);
@@ -2137,7 +2137,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       final ColumnVector[] result;
       if (previousVector == null) {
         result = new ColumnVector[readColumnCount];
@@ -2242,7 +2242,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
       throw new UnsupportedOperationException(
           "NextVector is not supported operation for Union type");
     }
@@ -2325,7 +2325,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previous, long batchSize) throws IOException {
+    public Object nextVector(Object previous, final int batchSize) throws IOException {
       throw new UnsupportedOperationException(
           "NextVector is not supported operation for List type");
     }
@@ -2419,7 +2419,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previous, long batchSize) throws IOException {
+    public Object nextVector(Object previous, final int batchSize) throws IOException {
       throw new UnsupportedOperationException(
           "NextVector is not supported operation for Map type");
     }