You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/11/20 22:06:05 UTC

svn commit: r1543947 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc: InStream.java RecordReaderImpl.java

Author: hashutosh
Date: Wed Nov 20 21:06:05 2013
New Revision: 1543947

URL: http://svn.apache.org/r1543947
Log:
HIVE-5663 : Refactor ORC RecordReader to operate on direct & wrapped ByteBuffers (Gopal V via Ashutosh Chauhan)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1543947&r1=1543946&r2=1543947&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Wed Nov 20 21:06:05 2013
@@ -29,10 +29,8 @@ abstract class InStream extends InputStr
     private final long[] offsets;
     private final long length;
     private long currentOffset;
-    private byte[] range;
+    private ByteBuffer range;
     private int currentRange;
-    private int offsetInRange;
-    private int limitInRange;
 
     public UncompressedStream(String name, ByteBuffer[] input, long[] offsets,
                               long length) {
@@ -41,42 +39,39 @@ abstract class InStream extends InputStr
       this.offsets = offsets;
       this.length = length;
       currentRange = 0;
-      offsetInRange = 0;
-      limitInRange = 0;
       currentOffset = 0;
     }
 
     @Override
     public int read() {
-      if (offsetInRange >= limitInRange) {
+      if (range == null || range.remaining() == 0) {
         if (currentOffset == length) {
           return -1;
         }
         seek(currentOffset);
       }
       currentOffset += 1;
-      return 0xff & range[offsetInRange++];
+      return 0xff & range.get();
     }
 
     @Override
     public int read(byte[] data, int offset, int length) {
-      if (offsetInRange >= limitInRange) {
+      if (range == null || range.remaining() == 0) {
         if (currentOffset == this.length) {
           return -1;
         }
         seek(currentOffset);
       }
-      int actualLength = Math.min(length, limitInRange - offsetInRange);
-      System.arraycopy(range, offsetInRange, data, offset, actualLength);
-      offsetInRange += actualLength;
+      int actualLength = Math.min(length, range.remaining());
+      range.get(data, offset, actualLength);
       currentOffset += actualLength;
       return actualLength;
     }
 
     @Override
     public int available() {
-      if (offsetInRange < limitInRange) {
-        return limitInRange - offsetInRange;
+      if (range != null && range.remaining() > 0) {
+        return range.remaining();
       }
       return (int) (length - currentOffset);
     }
@@ -85,6 +80,10 @@ abstract class InStream extends InputStr
     public void close() {
       currentRange = bytes.length;
       currentOffset = length;
+      // explicit de-ref of bytes[]
+      for(int i = 0; i < bytes.length; i++) {
+        bytes[i] = null;
+      }
     }
 
     @Override
@@ -98,10 +97,10 @@ abstract class InStream extends InputStr
             desired - offsets[i] < bytes[i].remaining()) {
           currentOffset = desired;
           currentRange = i;
-          this.range = bytes[i].array();
-          offsetInRange = bytes[i].arrayOffset() + bytes[i].position();
-          limitInRange = bytes[i].arrayOffset() + bytes[i].limit();
-          offsetInRange += desired - offsets[i];
+          this.range = bytes[i].duplicate();
+          int pos = range.position();
+          pos += (int)(desired - offsets[i]); // this is why we duplicate
+          this.range.position(pos);
           return;
         }
       }
@@ -113,7 +112,7 @@ abstract class InStream extends InputStr
     public String toString() {
       return "uncompressed stream " + name + " position: " + currentOffset +
           " length: " + length + " range: " + currentRange +
-          " offset: " + offsetInRange + " limit: " + limitInRange;
+          " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit());
     }
   }
 
@@ -123,14 +122,13 @@ abstract class InStream extends InputStr
     private final long[] offsets;
     private final int bufferSize;
     private final long length;
-    private ByteBuffer uncompressed = null;
+    private ByteBuffer uncompressed;
     private final CompressionCodec codec;
-    private byte[] compressed = null;
+    private ByteBuffer compressed;
     private long currentOffset;
     private int currentRange;
-    private int offsetInCompressed;
-    private int limitInCompressed;
     private boolean isUncompressedOriginal;
+    private boolean isDirect = false;
 
     public CompressedStream(String name, ByteBuffer[] input,
                             long[] offsets, long length,
@@ -140,49 +138,59 @@ abstract class InStream extends InputStr
       this.name = name;
       this.codec = codec;
       this.length = length;
+      if(this.length > 0) {
+        isDirect = this.bytes[0].isDirect();
+      }
       this.offsets = offsets;
       this.bufferSize = bufferSize;
       currentOffset = 0;
       currentRange = 0;
-      offsetInCompressed = 0;
-      limitInCompressed = 0;
+    }
+
+    private ByteBuffer allocateBuffer(int size) {
+      // TODO: use the same pool as the ORC readers
+      if(isDirect == true) {
+        return ByteBuffer.allocateDirect(size);
+      } else {
+        return ByteBuffer.allocate(size);
+      }
     }
 
     private void readHeader() throws IOException {
-      if (compressed == null || offsetInCompressed >= limitInCompressed) {
+      if (compressed == null || compressed.remaining() <= 0) {
         seek(currentOffset);
       }
-      if (limitInCompressed - offsetInCompressed > OutStream.HEADER_SIZE) {
-        int chunkLength = ((0xff & compressed[offsetInCompressed + 2]) << 15) |
-          ((0xff & compressed[offsetInCompressed + 1]) << 7) |
-            ((0xff & compressed[offsetInCompressed]) >> 1);
+      if (compressed.remaining() > OutStream.HEADER_SIZE) {
+        int b0 = compressed.get() & 0xff;
+        int b1 = compressed.get() & 0xff;
+        int b2 = compressed.get() & 0xff;
+        boolean isOriginal = (b0 & 0x01) == 1;
+        int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+
         if (chunkLength > bufferSize) {
           throw new IllegalArgumentException("Buffer size too small. size = " +
               bufferSize + " needed = " + chunkLength);
         }
-        boolean isOriginal = (compressed[offsetInCompressed] & 0x01) == 1;
-        offsetInCompressed += OutStream.HEADER_SIZE;
+        // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
+		assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
+        currentOffset += OutStream.HEADER_SIZE;
+
+        ByteBuffer slice = this.slice(chunkLength);
+
         if (isOriginal) {
+          uncompressed = slice;
           isUncompressedOriginal = true;
-          uncompressed = bytes[currentRange].duplicate();
-          uncompressed.position(offsetInCompressed -
-              bytes[currentRange].arrayOffset());
-          uncompressed.limit(offsetInCompressed + chunkLength);
         } else {
           if (isUncompressedOriginal) {
-            uncompressed = ByteBuffer.allocate(bufferSize);
+            uncompressed = allocateBuffer(bufferSize);
             isUncompressedOriginal = false;
           } else if (uncompressed == null) {
-            uncompressed = ByteBuffer.allocate(bufferSize);
+            uncompressed = allocateBuffer(bufferSize);
           } else {
             uncompressed.clear();
           }
-          codec.decompress(ByteBuffer.wrap(compressed, offsetInCompressed,
-              chunkLength),
-            uncompressed);
+          codec.decompress(slice, uncompressed);
         }
-        offsetInCompressed += chunkLength;
-        currentOffset += chunkLength + OutStream.HEADER_SIZE;
       } else {
         throw new IllegalStateException("Can't read header at " + this);
       }
@@ -208,10 +216,7 @@ abstract class InStream extends InputStr
         readHeader();
       }
       int actualLength = Math.min(length, uncompressed.remaining());
-      System.arraycopy(uncompressed.array(),
-        uncompressed.arrayOffset() + uncompressed.position(), data,
-        offset, actualLength);
-      uncompressed.position(uncompressed.position() + actualLength);
+      uncompressed.get(data, offset, actualLength);
       return actualLength;
     }
 
@@ -229,10 +234,12 @@ abstract class InStream extends InputStr
     @Override
     public void close() {
       uncompressed = null;
+      compressed = null;
       currentRange = bytes.length;
-      offsetInCompressed = 0;
-      limitInCompressed = 0;
       currentOffset = length;
+      for(int i = 0; i < bytes.length; i++) {
+        bytes[i] = null;
+      }
     }
 
     @Override
@@ -249,16 +256,62 @@ abstract class InStream extends InputStr
       }
     }
 
+    /* slices a read only contigous buffer of chunkLength */
+    private ByteBuffer slice(int chunkLength) throws IOException {
+      int len = chunkLength;
+      final long oldOffset = currentOffset;
+      ByteBuffer slice;
+      if (compressed.remaining() >= len) {
+        slice = compressed.slice();
+        // simple case
+        slice.limit(len);
+        currentOffset += len;
+        compressed.position(compressed.position() + len);
+        return slice;
+      } else if (currentRange >= (bytes.length - 1)) {
+        // nothing has been modified yet
+        throw new IOException("EOF in " + this + " while trying to read " +
+            chunkLength + " bytes");
+      }
+
+      // we need to consolidate 2 or more buffers into 1
+      // first clear out compressed buffers
+      ByteBuffer copy = allocateBuffer(chunkLength);
+      currentOffset += compressed.remaining();
+      len -= compressed.remaining();
+      copy.put(compressed);
+
+      while (len > 0 && (++currentRange) < bytes.length) {
+        compressed = bytes[currentRange].duplicate();
+        if (compressed.remaining() >= len) {
+          slice = compressed.slice();
+          slice.limit(len);
+          copy.put(slice);
+          currentOffset += len;
+          compressed.position(compressed.position() + len);
+          return copy;
+        }
+        currentOffset += compressed.remaining();
+        len -= compressed.remaining();
+        copy.put(compressed);
+      }
+
+      // restore offsets for exception clarity
+      seek(oldOffset);
+      throw new IOException("EOF in " + this + " while trying to read " +
+          chunkLength + " bytes");
+    }
+
     private void seek(long desired) throws IOException {
       for(int i = 0; i < bytes.length; ++i) {
         if (offsets[i] <= desired &&
             desired - offsets[i] < bytes[i].remaining()) {
           currentRange = i;
-          compressed = bytes[i].array();
-          offsetInCompressed = (int) (bytes[i].arrayOffset() +
-              bytes[i].position() + (desired - offsets[i]));
+          compressed = bytes[i].duplicate();
+          int pos = compressed.position();
+          pos += (int)(desired - offsets[i]);
+          compressed.position(pos);
           currentOffset = desired;
-          limitInCompressed = bytes[i].arrayOffset() + bytes[i].limit();
           return;
         }
       }
@@ -267,11 +320,9 @@ abstract class InStream extends InputStr
       if (segments != 0 &&
           desired == offsets[segments - 1] + bytes[segments - 1].remaining()) {
         currentRange = segments - 1;
-        compressed = bytes[currentRange].array();
-        offsetInCompressed = bytes[currentRange].arrayOffset() +
-          bytes[currentRange].limit();
+        compressed = bytes[currentRange].duplicate();
+        compressed.position(compressed.limit());
         currentOffset = desired;
-        limitInCompressed = offsetInCompressed;
         return;
       }
       throw new IOException("Seek outside of data in " + this + " to " +
@@ -294,7 +345,7 @@ abstract class InStream extends InputStr
     public String toString() {
       return "compressed stream " + name + " position: " + currentOffset +
           " length: " + length + " range: " + currentRange +
-          " offset: " + offsetInCompressed + " limit: " + limitInCompressed +
+          " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
           rangeString() +
           (uncompressed == null ? "" :
               " uncompressed: " + uncompressed.position() + " to " +

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1543947&r1=1543946&r2=1543947&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Nov 20 21:06:05 2013
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -75,6 +76,7 @@ class RecordReaderImpl implements Record
   private long rowCountInStripe = 0;
   private final Map<StreamName, InStream> streams =
       new HashMap<StreamName, InStream>();
+  List<BufferChunk> bufferChunks = new ArrayList<BufferChunk>(0);
   private final TreeReader reader;
   private final OrcProto.RowIndex[] indexes;
   private final SearchArgument sarg;
@@ -125,6 +127,7 @@ class RecordReaderImpl implements Record
         rows += stripe.getNumberOfRows();
       }
     }
+
     firstRow = skippedRows;
     totalRowCount = rows;
     reader = createTreeReader(path, 0, types, included);
@@ -2176,6 +2179,17 @@ class RecordReaderImpl implements Record
     return null;
   }
 
+  private void clearStreams() throws IOException {
+    // explicit close of all streams to de-ref ByteBuffers
+    for(InStream is: streams.values()) {
+      is.close();
+    }
+    if(bufferChunks != null) {
+      bufferChunks.clear();
+    }
+    streams.clear();
+  }
+
   /**
    * Read the current stripe into memory.
    * @throws IOException
@@ -2183,7 +2197,7 @@ class RecordReaderImpl implements Record
   private void readStripe() throws IOException {
     StripeInformation stripe = stripes.get(currentStripe);
     stripeFooter = readStripeFooter(stripe);
-    streams.clear();
+    clearStreams();
     // setup the position in the stripe
     rowCountInStripe = stripe.getNumberOfRows();
     rowInStripe = 0;
@@ -2223,28 +2237,17 @@ class RecordReaderImpl implements Record
 
   private void readAllDataStreams(StripeInformation stripe
                                   ) throws IOException {
-    byte[] buffer =
-      new byte[(int) (stripe.getDataLength())];
-    file.seek(stripe.getOffset() + stripe.getIndexLength());
-    file.readFully(buffer, 0, buffer.length);
-    int sectionOffset = 0;
-    for(OrcProto.Stream section: stripeFooter.getStreamsList()) {
-      if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) {
-        int sectionLength = (int) section.getLength();
-        ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset,
-            sectionLength);
-        StreamName name = new StreamName(section.getColumn(),
-            section.getKind());
-        streams.put(name,
-            InStream.create(name.toString(), new ByteBuffer[]{sectionBuffer},
-                new long[]{0}, sectionLength, codec, bufferSize));
-        sectionOffset += sectionLength;
-      }
-    }
+    long start = stripe.getIndexLength();
+    long end = start + stripe.getDataLength();
+    // explicitly trigger 1 big read
+    DiskRange[] ranges = new DiskRange[]{new DiskRange(start, end)};
+    bufferChunks = readDiskRanges(file, stripe.getOffset(), Arrays.asList(ranges));
+    List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
+    createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
   }
 
   /**
-   * The secionts of stripe that we need to read.
+   * The sections of stripe that we need to read.
    */
   static class DiskRange {
     /** the first address we need to read. */
@@ -2275,6 +2278,30 @@ class RecordReaderImpl implements Record
     }
   }
 
+  /**
+   * The sections of stripe that we have read.
+   * This might not match diskRange - 1 disk range can be multiple buffer chunks, depending on DFS block boundaries.
+   */
+  static class BufferChunk {
+    final ByteBuffer chunk;
+    /** the first address we need to read. */
+    final long offset;
+    /** end of the buffer **/
+    final long end;
+
+    BufferChunk(ByteBuffer chunk, long offset) {
+      this.offset = offset;
+      this.chunk = chunk;
+      end = offset + chunk.remaining();
+    }
+
+    @Override
+    public final String toString() {
+      return "range start: " + offset + " size: " + chunk.remaining() + " type: "
+          + (chunk.isDirect() ? "direct" : "array-backed");
+    }
+  }
+
   private static final int BYTE_STREAM_POSITIONS = 1;
   private static final int RUN_LENGTH_BYTE_POSITIONS =
       BYTE_STREAM_POSITIONS + 1;
@@ -2460,17 +2487,17 @@ class RecordReaderImpl implements Record
    *    ranges
    * @throws IOException
    */
-  static byte[][] readDiskRanges(FSDataInputStream file,
+  List<BufferChunk> readDiskRanges(FSDataInputStream file,
                                  long base,
                                  List<DiskRange> ranges) throws IOException {
-    byte[][] result = new byte[ranges.size()][];
-    int i = 0;
+    ArrayList<BufferChunk> result = new ArrayList<RecordReaderImpl.BufferChunk>(ranges.size());
     for(DiskRange range: ranges) {
       int len = (int) (range.end - range.offset);
-      result[i] = new byte[len];
-      file.seek(base + range.offset);
-      file.readFully(result[i]);
-      i += 1;
+      long off = range.offset;
+      file.seek(base + off); 
+      byte[] buffer = new byte[len];
+      file.readFully(buffer, 0, buffer.length);
+      result.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
     }
     return result;
   }
@@ -2509,8 +2536,7 @@ class RecordReaderImpl implements Record
   }
 
   static void createStreams(List<OrcProto.Stream> streamDescriptions,
-                            List<DiskRange> ranges,
-                            byte[][] bytes,
+                            List<BufferChunk> ranges,
                             boolean[] includeColumn,
                             CompressionCodec codec,
                             int bufferSize,
@@ -2519,13 +2545,13 @@ class RecordReaderImpl implements Record
     long offset = 0;
     for(OrcProto.Stream streamDesc: streamDescriptions) {
       int column = streamDesc.getColumn();
-      if (includeColumn[column] &&
+      if ((includeColumn == null || includeColumn[column]) &&
           StreamName.getArea(streamDesc.getKind()) == StreamName.Area.DATA) {
         long length = streamDesc.getLength();
         int first = -1;
         int last = -2;
-        for(int i=0; i < bytes.length; ++i) {
-          DiskRange range = ranges.get(i);
+        for(int i=0; i < ranges.size(); ++i) {
+          BufferChunk range = ranges.get(i);
           if (overlap(offset, offset+length, range.offset, range.end)) {
             if (first == -1) {
               first = i;
@@ -2536,12 +2562,24 @@ class RecordReaderImpl implements Record
         ByteBuffer[] buffers = new ByteBuffer[last - first + 1];
         long[] offsets = new long[last - first + 1];
         for(int i=0; i < buffers.length; ++i) {
-          DiskRange range = ranges.get(i + first);
+          BufferChunk range = ranges.get(i + first);
           long start = Math.max(range.offset, offset);
           long end = Math.min(range.end, offset+length);
-          buffers[i] = ByteBuffer.wrap(bytes[first + i],
-              Math.max(0, (int) (offset - range.offset)), (int) (end - start));
-          offsets[i] = Math.max(0, range.offset - offset);
+          buffers[i] = range.chunk.slice();
+          assert range.chunk.position() == 0; // otherwise we'll mix up positions
+          /*
+           * buffers are positioned in-wards if the offset > range.offset
+           * offsets[i] == range.offset - offset, except if offset > range.offset
+           */
+          if(offset > range.offset) {
+            buffers[i].position((int)(offset - range.offset));
+            buffers[i].limit((int)(end - range.offset));
+            offsets[i] = 0;
+          } else {
+            buffers[i].position(0);
+            buffers[i].limit((int)(end - range.offset));
+            offsets[i] = (range.offset - offset);
+          }
         }
         StreamName name = new StreamName(column, streamDesc.getKind());
         streams.put(name, InStream.create(name.toString(), buffers, offsets,
@@ -2565,8 +2603,8 @@ class RecordReaderImpl implements Record
     if (LOG.isDebugEnabled()) {
       LOG.debug("merge = " + stringifyDiskRanges(chunks));
     }
-    byte[][] bytes = readDiskRanges(file, stripe.getOffset(), chunks);
-    createStreams(streamList, chunks, bytes, included, codec, bufferSize,
+    bufferChunks = readDiskRanges(file, stripe.getOffset(), chunks);
+    createStreams(streamList, bufferChunks, included, codec, bufferSize,
         streams);
   }
 
@@ -2666,6 +2704,7 @@ class RecordReaderImpl implements Record
 
   @Override
   public void close() throws IOException {
+    clearStreams();
     file.close();
   }