You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/08/18 00:06:14 UTC

[2/3] hive git commit: HIVE-10535 : LLAP: move EncodedTreeReaderFactory, TreeReaderFactory bits that rely on orc.encoded, and StreamUtils if needed, to orc.encoded package (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

http://git-wip-us.apache.org/repos/asf/hive/blob/b18db4f4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
index 8295929..328e4e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
 
-import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -64,17 +63,12 @@ public class TreeReaderFactory {
     protected final int columnId;
     protected BitFieldReader present = null;
     protected boolean valuePresent = false;
-    protected ColumnStreamData presentStreamBuffer = null;
-    protected ColumnStreamData dataStreamBuffer = null;
-    protected ColumnStreamData dictionaryStreamBuffer = null;
-    protected ColumnStreamData lengthsStreamBuffer = null;
-    protected ColumnStreamData secondaryStreamBuffer = null;
 
     TreeReader(int columnId) throws IOException {
       this(columnId, null);
     }
 
-    TreeReader(int columnId, InStream in) throws IOException {
+    protected TreeReader(int columnId, InStream in) throws IOException {
       this.columnId = columnId;
       if (in == null) {
         present = null;
@@ -91,7 +85,7 @@ public class TreeReaderFactory {
       }
     }
 
-    IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
+    static IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
         InStream in,
         boolean signed, boolean skipCorrupt) throws IOException {
       switch (kind) {
@@ -136,37 +130,6 @@ public class TreeReaderFactory {
       }
     }
 
-    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
-        throws IOException {
-      // stream buffers are arranged in enum order of stream kind
-      for (ColumnStreamData streamBuffer : buffers) {
-        switch (streamBuffer.getStreamKind()) {
-          case 0:
-            // PRESENT stream
-            presentStreamBuffer = streamBuffer;
-            break;
-          case 1:
-            // DATA stream
-            dataStreamBuffer = streamBuffer;
-            break;
-          case 2:
-            // LENGTH stream
-            lengthsStreamBuffer = streamBuffer;
-            break;
-          case 3:
-            // DICTIONARY_DATA stream
-            dictionaryStreamBuffer = streamBuffer;
-            break;
-          case 5:
-            // SECONDARY stream
-            secondaryStreamBuffer = streamBuffer;
-            break;
-          default:
-            throw new IOException("Unexpected stream kind: " + streamBuffer.getStreamKind());
-        }
-      }
-    }
-
     protected long countNonNulls(long rows) throws IOException {
       if (present != null) {
         long result = 0;
@@ -222,16 +185,20 @@ public class TreeReaderFactory {
       }
       return previousVector;
     }
+
+    public BitFieldReader getPresent() {
+      return present;
+    }
   }
 
-  protected static class BooleanTreeReader extends TreeReader {
+  public static class BooleanTreeReader extends TreeReader {
     protected BitFieldReader reader = null;
 
     BooleanTreeReader(int columnId) throws IOException {
       this(columnId, null, null);
     }
 
-    BooleanTreeReader(int columnId, InStream present, InStream data) throws IOException {
+    protected BooleanTreeReader(int columnId, InStream present, InStream data) throws IOException {
       super(columnId, present);
       if (data != null) {
         reader = new BitFieldReader(data, 1);
@@ -296,14 +263,14 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class ByteTreeReader extends TreeReader {
+  public static class ByteTreeReader extends TreeReader {
     protected RunLengthByteReader reader = null;
 
     ByteTreeReader(int columnId) throws IOException {
       this(columnId, null, null);
     }
 
-    ByteTreeReader(int columnId, InStream present, InStream data) throws IOException {
+    protected ByteTreeReader(int columnId, InStream present, InStream data) throws IOException {
       super(columnId, present);
       this.reader = new RunLengthByteReader(data);
     }
@@ -366,14 +333,14 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class ShortTreeReader extends TreeReader {
+  public static class ShortTreeReader extends TreeReader {
     protected IntegerReader reader = null;
 
     ShortTreeReader(int columnId) throws IOException {
       this(columnId, null, null, null);
     }
 
-    ShortTreeReader(int columnId, InStream present, InStream data,
+    protected ShortTreeReader(int columnId, InStream present, InStream data,
         OrcProto.ColumnEncoding encoding)
         throws IOException {
       super(columnId, present);
@@ -452,14 +419,14 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class IntTreeReader extends TreeReader {
+  public static class IntTreeReader extends TreeReader {
     protected IntegerReader reader = null;
 
     IntTreeReader(int columnId) throws IOException {
       this(columnId, null, null, null);
     }
 
-    IntTreeReader(int columnId, InStream present, InStream data,
+    protected IntTreeReader(int columnId, InStream present, InStream data,
         OrcProto.ColumnEncoding encoding)
         throws IOException {
       super(columnId, present);
@@ -538,14 +505,14 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class LongTreeReader extends TreeReader {
+  public static class LongTreeReader extends TreeReader {
     protected IntegerReader reader = null;
 
     LongTreeReader(int columnId, boolean skipCorrupt) throws IOException {
       this(columnId, null, null, null, skipCorrupt);
     }
 
-    LongTreeReader(int columnId, InStream present, InStream data,
+    protected LongTreeReader(int columnId, InStream present, InStream data,
         OrcProto.ColumnEncoding encoding,
         boolean skipCorrupt)
         throws IOException {
@@ -625,7 +592,7 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class FloatTreeReader extends TreeReader {
+  public static class FloatTreeReader extends TreeReader {
     protected InStream stream;
     private final SerializationUtils utils;
 
@@ -633,7 +600,7 @@ public class TreeReaderFactory {
       this(columnId, null, null);
     }
 
-    FloatTreeReader(int columnId, InStream present, InStream data) throws IOException {
+    protected FloatTreeReader(int columnId, InStream present, InStream data) throws IOException {
       super(columnId, present);
       this.utils = new SerializationUtils();
       this.stream = data;
@@ -737,7 +704,7 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class DoubleTreeReader extends TreeReader {
+  public static class DoubleTreeReader extends TreeReader {
     protected InStream stream;
     private final SerializationUtils utils;
 
@@ -745,7 +712,7 @@ public class TreeReaderFactory {
       this(columnId, null, null);
     }
 
-    DoubleTreeReader(int columnId, InStream present, InStream data) throws IOException {
+    protected DoubleTreeReader(int columnId, InStream present, InStream data) throws IOException {
       super(columnId, present);
       this.utils = new SerializationUtils();
       this.stream = data;
@@ -852,7 +819,7 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class BinaryTreeReader extends TreeReader {
+  public static class BinaryTreeReader extends TreeReader {
     protected InStream stream;
     protected IntegerReader lengths = null;
     protected final LongColumnVector scratchlcv;
@@ -861,7 +828,7 @@ public class TreeReaderFactory {
       this(columnId, null, null, null, null);
     }
 
-    BinaryTreeReader(int columnId, InStream present, InStream data, InStream length,
+    protected BinaryTreeReader(int columnId, InStream present, InStream data, InStream length,
         OrcProto.ColumnEncoding encoding) throws IOException {
       super(columnId, present);
       scratchlcv = new LongColumnVector();
@@ -959,7 +926,7 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class TimestampTreeReader extends TreeReader {
+  public static class TimestampTreeReader extends TreeReader {
     protected IntegerReader data = null;
     protected IntegerReader nanos = null;
     private final boolean skipCorrupt;
@@ -973,7 +940,7 @@ public class TreeReaderFactory {
       this(columnId, null, null, null, null, skipCorrupt);
     }
 
-    TimestampTreeReader(int columnId, InStream presentStream, InStream dataStream,
+    protected TimestampTreeReader(int columnId, InStream presentStream, InStream dataStream,
         InStream nanosStream, OrcProto.ColumnEncoding encoding, boolean skipCorrupt)
         throws IOException {
       super(columnId, presentStream);
@@ -1144,14 +1111,14 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class DateTreeReader extends TreeReader {
+  public static class DateTreeReader extends TreeReader {
     protected IntegerReader reader = null;
 
     DateTreeReader(int columnId) throws IOException {
       this(columnId, null, null, null);
     }
 
-    DateTreeReader(int columnId, InStream present, InStream data,
+    protected DateTreeReader(int columnId, InStream present, InStream data,
         OrcProto.ColumnEncoding encoding) throws IOException {
       super(columnId, present);
       if (data != null && encoding != null) {
@@ -1229,7 +1196,7 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class DecimalTreeReader extends TreeReader {
+  public static class DecimalTreeReader extends TreeReader {
     protected InStream valueStream;
     protected IntegerReader scaleReader = null;
     private LongColumnVector scratchScaleVector;
@@ -1241,7 +1208,7 @@ public class TreeReaderFactory {
       this(columnId, precision, scale, null, null, null, null);
     }
 
-    DecimalTreeReader(int columnId, int precision, int scale, InStream present,
+    protected DecimalTreeReader(int columnId, int precision, int scale, InStream present,
         InStream valueStream, InStream scaleStream, OrcProto.ColumnEncoding encoding)
         throws IOException {
       super(columnId, present);
@@ -1363,14 +1330,14 @@ public class TreeReaderFactory {
    * stripe, it creates an internal reader based on whether a direct or
    * dictionary encoding was used.
    */
-  protected static class StringTreeReader extends TreeReader {
+  public static class StringTreeReader extends TreeReader {
     protected TreeReader reader;
 
     StringTreeReader(int columnId) throws IOException {
       super(columnId);
     }
 
-    StringTreeReader(int columnId, InStream present, InStream data, InStream length,
+    protected StringTreeReader(int columnId, InStream present, InStream data, InStream length,
         InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
       super(columnId, present);
       if (encoding != null) {
@@ -1521,7 +1488,7 @@ public class TreeReaderFactory {
    * A reader for string columns that are direct encoded in the current
    * stripe.
    */
-  protected static class StringDirectTreeReader extends TreeReader {
+  public static class StringDirectTreeReader extends TreeReader {
     protected InStream stream;
     protected IntegerReader lengths;
     private final LongColumnVector scratchlcv;
@@ -1530,8 +1497,8 @@ public class TreeReaderFactory {
       this(columnId, null, null, null, null);
     }
 
-    StringDirectTreeReader(int columnId, InStream present, InStream data, InStream length,
-        OrcProto.ColumnEncoding.Kind encoding) throws IOException {
+    protected StringDirectTreeReader(int columnId, InStream present, InStream data,
+        InStream length, OrcProto.ColumnEncoding.Kind encoding) throws IOException {
       super(columnId, present);
       this.scratchlcv = new LongColumnVector();
       this.stream = data;
@@ -1628,13 +1595,21 @@ public class TreeReaderFactory {
         lengthToSkip -= stream.skip(lengthToSkip);
       }
     }
+
+    public IntegerReader getLengths() {
+      return lengths;
+    }
+
+    public InStream getStream() {
+      return stream;
+    }
   }
 
   /**
    * A reader for string columns that are dictionary encoded in the current
    * stripe.
    */
-  protected static class StringDictionaryTreeReader extends TreeReader {
+  public static class StringDictionaryTreeReader extends TreeReader {
     private DynamicByteArray dictionaryBuffer;
     private int[] dictionaryOffsets;
     protected IntegerReader reader;
@@ -1646,7 +1621,7 @@ public class TreeReaderFactory {
       this(columnId, null, null, null, null, null);
     }
 
-    StringDictionaryTreeReader(int columnId, InStream present, InStream data,
+    protected StringDictionaryTreeReader(int columnId, InStream present, InStream data,
         InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding)
         throws IOException {
       super(columnId, present);
@@ -1839,16 +1814,20 @@ public class TreeReaderFactory {
     void skipRows(long items) throws IOException {
       reader.skip(countNonNulls(items));
     }
+
+    public IntegerReader getReader() {
+      return reader;
+    }
   }
 
-  protected static class CharTreeReader extends StringTreeReader {
+  public static class CharTreeReader extends StringTreeReader {
     int maxLength;
 
     CharTreeReader(int columnId, int maxLength) throws IOException {
       this(columnId, maxLength, null, null, null, null, null);
     }
 
-    CharTreeReader(int columnId, int maxLength, InStream present, InStream data,
+    protected CharTreeReader(int columnId, int maxLength, InStream present, InStream data,
         InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
       super(columnId, present, data, length, dictionary, encoding);
       this.maxLength = maxLength;
@@ -1915,14 +1894,14 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class VarcharTreeReader extends StringTreeReader {
+  public static class VarcharTreeReader extends StringTreeReader {
     int maxLength;
 
     VarcharTreeReader(int columnId, int maxLength) throws IOException {
       this(columnId, maxLength, null, null, null, null, null);
     }
 
-    VarcharTreeReader(int columnId, int maxLength, InStream present, InStream data,
+    protected VarcharTreeReader(int columnId, int maxLength, InStream present, InStream data,
         InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
       super(columnId, present, data, length, dictionary, encoding);
       this.maxLength = maxLength;
@@ -1987,11 +1966,11 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class StructTreeReader extends TreeReader {
+  public static class StructTreeReader extends TreeReader {
     protected final TreeReader[] fields;
     private final String[] fieldNames;
 
-    StructTreeReader(int columnId,
+    protected StructTreeReader(int columnId,
         List<OrcProto.Type> types,
         boolean[] included,
         boolean skipCorrupt) throws IOException {
@@ -2090,11 +2069,11 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class UnionTreeReader extends TreeReader {
+  public static class UnionTreeReader extends TreeReader {
     protected final TreeReader[] fields;
     protected RunLengthByteReader tags;
 
-    UnionTreeReader(int columnId,
+    protected UnionTreeReader(int columnId,
         List<OrcProto.Type> types,
         boolean[] included,
         boolean skipCorrupt) throws IOException {
@@ -2170,11 +2149,11 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class ListTreeReader extends TreeReader {
+  public static class ListTreeReader extends TreeReader {
     protected final TreeReader elementReader;
     protected IntegerReader lengths = null;
 
-    ListTreeReader(int columnId,
+    protected ListTreeReader(int columnId,
         List<OrcProto.Type> types,
         boolean[] included,
         boolean skipCorrupt) throws IOException {
@@ -2259,12 +2238,12 @@ public class TreeReaderFactory {
     }
   }
 
-  protected static class MapTreeReader extends TreeReader {
+  public static class MapTreeReader extends TreeReader {
     protected final TreeReader keyReader;
     protected final TreeReader valueReader;
     protected IntegerReader lengths = null;
 
-    MapTreeReader(int columnId,
+    protected MapTreeReader(int columnId,
         List<OrcProto.Type> types,
         boolean[] included,
         boolean skipCorrupt) throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/b18db4f4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
index b3c9169..00dcc15 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
@@ -35,7 +35,7 @@ public class CacheChunk extends DiskRangeList {
 
   @Override
   public ByteBuffer getData() {
-    // Callers duplicate the buffer, they have to for BufferChunk
+    // Callers duplicate the buffer, they have to for BufferChunk; so we don't have to.
     return buffer.getByteBufferRaw();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b18db4f4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index ce503d9..b1477e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -348,7 +348,7 @@ class EncodedReaderImpl implements EncodedReader {
         ColumnReadContext ctx = colCtxs[colIxMod];
         RowIndexEntry index = ctx.rowIndex.getEntry(rgIx),
             nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
-        ecb.initColumn(colIxMod, ctx.colIx, ctx.streamCount);
+        ecb.initColumn(colIxMod, ctx.colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
         for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
           StreamContext sctx = ctx.streams[streamIx];
           ColumnStreamData cb = null;
@@ -360,7 +360,6 @@ class EncodedReaderImpl implements EncodedReader {
             }
             if (sctx.stripeLevelStream == null) {
               sctx.stripeLevelStream = POOLS.csdPool.take();
-              sctx.stripeLevelStream.init(sctx.kind.getNumber());
               // We will be using this for each RG while also sending RGs to processing.
               // To avoid buffers being unlocked, run refcount one ahead; we will not increase
               // it when building the last RG, so each RG processing will decref once, and the
@@ -399,7 +398,7 @@ class EncodedReaderImpl implements EncodedReader {
               sctx.bufferIter = iter = lastCached;
             }
           }
-          ecb.setStreamData(colIxMod, streamIx, cb);
+          ecb.setStreamData(colIxMod, sctx.kind.getNumber(), cb);
         }
       }
       if (isRGSelected) {
@@ -431,9 +430,7 @@ class EncodedReaderImpl implements EncodedReader {
 
   private ColumnStreamData createRgColumnStreamData(int rgIx, boolean isLastRg,
       int colIx, StreamContext sctx, long cOffset, long endCOffset, boolean isCompressed) {
-    ColumnStreamData cb;
-    cb = POOLS.csdPool.take();
-    cb.init(sctx.kind.getNumber());
+    ColumnStreamData cb = POOLS.csdPool.take();
     cb.incRef();
     if (isDebugTracingEnabled) {
       LOG.info("Getting data for column "+ colIx + " " + (isLastRg ? "last " : "")