You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/05/15 22:40:47 UTC

[27/50] [abbrv] hive git commit: HIVE-19479 : encoded stream seek is incorrect for 0-length RGs in LLAP IO (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

HIVE-19479 : encoded stream seek is incorrect for 0-length RGs in LLAP IO (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e941bea8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e941bea8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e941bea8

Branch: refs/heads/branch-3.0.0
Commit: e941bea80393e74efd64b07355b2e0fac384f7cc
Parents: 9ebb2ff
Author: sergey <se...@apache.org>
Authored: Fri May 11 12:01:10 2018 -0700
Committer: Vineet Garg <vg...@apache.org>
Committed: Sun May 13 21:03:44 2018 -0700

----------------------------------------------------------------------
 .../llap/io/decode/OrcEncodedDataConsumer.java  |   3 +
 .../ql/io/orc/encoded/EncodedReaderImpl.java    |  10 +-
 .../orc/encoded/EncodedTreeReaderFactory.java   | 118 ++++++++++---------
 3 files changed, 70 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e941bea8/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index fc0c66a..05282db 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -296,6 +296,9 @@ public class OrcEncodedDataConsumer
       ConsumerStripeMetadata stripeMetadata) throws IOException {
     PositionProvider[] pps = createPositionProviders(
         columnReaders, batch.getBatchKey(), stripeMetadata);
+    if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.ORC_LOGGER.trace("Created pps {}", Arrays.toString(pps));
+    }
     if (pps == null) return;
     for (int i = 0; i < columnReaders.length; i++) {
       TreeReader reader = columnReaders[i];

http://git-wip-us.apache.org/repos/asf/hive/blob/e941bea8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 1d7eceb..348f9df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -435,12 +435,12 @@ class EncodedReaderImpl implements EncodedReader {
               try {
                 if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding) || index == null) {
                   // This stream is for entire stripe and needed for every RG; uncompress once and reuse.
-                  if (isTracingEnabled) {
-                    LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
-                        + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
-                  }
-                  trace.logStartStripeStream(sctx.kind);
                   if (sctx.stripeLevelStream == null) {
+                    if (isTracingEnabled) {
+                      LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
+                          + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
+                    }
+                    trace.logStartStripeStream(sctx.kind);
                     sctx.stripeLevelStream = POOLS.csdPool.take();
                     // We will be using this for each RG while also sending RGs to processing.
                     // To avoid buffers being unlocked, run refcount one ahead; so each RG 

http://git-wip-us.apache.org/repos/asf/hive/blob/e941bea8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
index 42532f9..646b214 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.TypeDescription.Category;
+import org.apache.orc.impl.InStream;
 import org.apache.orc.impl.PositionProvider;
 import org.apache.orc.impl.SettableUncompressedStream;
 import org.apache.orc.impl.TreeReaderFactory;
@@ -213,6 +214,11 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
   }
 
+  private static void skipCompressedIndex(boolean isCompressed, PositionProvider index) {
+    if (!isCompressed) return;
+    index.getNext();
+  }
+
   protected static class StringStreamReader extends StringTreeReader
       implements SettableTreeReader {
     private boolean _isFileCompressed;
@@ -260,30 +266,30 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
         // data stream could be empty stream or already reached end of stream before present stream.
         // This can happen if all values in stream are nulls or last row group values are all null.
+        skipCompressedIndex(_isFileCompressed, index);
         if (_dataStream != null && _dataStream.available() > 0) {
-          if (_isFileCompressed) {
-            index.getNext();
-          }
           ((StringDictionaryTreeReader) reader).getReader().seek(index);
-        }
+        } // No need to skip seek here, index won't be used anymore.
       } else {
         // DIRECT encoding
 
         // data stream could be empty stream or already reached end of stream before present stream.
         // This can happen if all values in stream are nulls or last row group values are all null.
+        skipCompressedIndex(_isFileCompressed, index);
+        // TODO: why does the original code not just use _dataStream that it passes in as stream?
+        InStream stream = ((StringDirectTreeReader) reader).getStream();
+        // TODO: not clear why this check and skipSeek are needed.
         if (_dataStream != null && _dataStream.available() > 0) {
-          if (_isFileCompressed) {
-            index.getNext();
-          }
-          ((StringDirectTreeReader) reader).getStream().seek(index);
+          stream.seek(index);
+        } else {
+          assert stream == _dataStream;
+          skipSeek(index);
         }
 
+        skipCompressedIndex(_isFileCompressed, index);
         if (_lengthStream != null && _lengthStream.available() > 0) {
-          if (_isFileCompressed) {
-            index.getNext();
-          }
           ((StringDirectTreeReader) reader).getLengths().seek(index);
-        }
+        } // No need to skip seek here, index won't be used anymore.
       }
     }
 
@@ -830,10 +836,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
       // data stream could be empty stream or already reached end of stream before present stream.
       // This can happen if all values in stream are nulls or last row group values are all null.
+      skipCompressedIndex(_isFileCompressed, index);
       if (_dataStream.available() > 0) {
-        if (_isFileCompressed) {
-          index.getNext();
-        }
         stream.seek(index);
       }
     }
@@ -945,10 +949,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
       // data stream could be empty stream or already reached end of stream before present stream.
       // This can happen if all values in stream are nulls or last row group values are all null.
+      skipCompressedIndex(_isFileCompressed, index);
       if (_dataStream.available() > 0) {
-        if (_isFileCompressed) {
-          index.getNext();
-        }
         stream.seek(index);
       }
     }
@@ -1071,19 +1073,19 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
       // data stream could be empty stream or already reached end of stream before present stream.
       // This can happen if all values in stream are nulls or last row group values are all null.
+      skipCompressedIndex(_isFileCompressed, index);
+      // TODO: not clear why this check and skipSeek are needed.
       if (_valueStream.available() > 0) {
-        if (_isFileCompressed) {
-          index.getNext();
-        }
         valueStream.seek(index);
+      } else {
+        assert valueStream == _valueStream;
+        skipSeek(index);
       }
 
+      skipCompressedIndex(_isFileCompressed, index);
       if (_scaleStream.available() > 0) {
-        if (_isFileCompressed) {
-          index.getNext();
-        }
         scaleReader.seek(index);
-      }
+      } // No need to skip seek here, index won't be used anymore.
     }
 
     @Override
@@ -1375,30 +1377,29 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
         // data stream could be empty stream or already reached end of stream before present stream.
         // This can happen if all values in stream are nulls or last row group values are all null.
+        skipCompressedIndex(_isFileCompressed, index);
         if (_dataStream.available() > 0) {
-          if (_isFileCompressed) {
-            index.getNext();
-          }
           ((StringDictionaryTreeReader) reader).getReader().seek(index);
-        }
+        } // No need to skip seek here, index won't be used anymore.
       } else {
         // DIRECT encoding
 
         // data stream could be empty stream or already reached end of stream before present stream.
         // This can happen if all values in stream are nulls or last row group values are all null.
+        skipCompressedIndex(_isFileCompressed, index);
+        InStream stream = ((StringDirectTreeReader) reader).getStream();
+        // TODO: not clear why this check and skipSeek are needed.
         if (_dataStream.available() > 0) {
-          if (_isFileCompressed) {
-            index.getNext();
-          }
-          ((StringDirectTreeReader) reader).getStream().seek(index);
+          stream.seek(index);
+        } else {
+          assert stream == _dataStream;
+          skipSeek(index);
         }
 
+        skipCompressedIndex(_isFileCompressed, index);
         if (_lengthStream.available() > 0) {
-          if (_isFileCompressed) {
-            index.getNext();
-          }
           ((StringDirectTreeReader) reader).getLengths().seek(index);
-        }
+        } // No need to skip seek here, index won't be used anymore.
       }
     }
 
@@ -1574,30 +1575,29 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
         // data stream could be empty stream or already reached end of stream before present stream.
         // This can happen if all values in stream are nulls or last row group values are all null.
+        skipCompressedIndex(_isFileCompressed, index);
         if (_dataStream.available() > 0) {
-          if (_isFileCompressed) {
-            index.getNext();
-          }
           ((StringDictionaryTreeReader) reader).getReader().seek(index);
-        }
+        } // No need to skip seek here, index won't be used anymore.
       } else {
         // DIRECT encoding
 
         // data stream could be empty stream or already reached end of stream before present stream.
         // This can happen if all values in stream are nulls or last row group values are all null.
+        skipCompressedIndex(_isFileCompressed, index);
+        InStream stream = ((StringDirectTreeReader) reader).getStream();
+        // TODO: not clear why this check and skipSeek are needed.
         if (_dataStream.available() > 0) {
-          if (_isFileCompressed) {
-            index.getNext();
-          }
-          ((StringDirectTreeReader) reader).getStream().seek(index);
+          stream.seek(index);
+        } else {
+          assert stream == _dataStream;
+          skipSeek(index);
         }
 
+        skipCompressedIndex(_isFileCompressed, index);
         if (_lengthStream.available() > 0) {
-          if (_isFileCompressed) {
-            index.getNext();
-          }
           ((StringDirectTreeReader) reader).getLengths().seek(index);
-        }
+        } // No need to skip seek here, index won't be used anymore.
       }
     }
 
@@ -1885,19 +1885,19 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
       // data stream could be empty stream or already reached end of stream before present stream.
       // This can happen if all values in stream are nulls or last row group values are all null.
+      skipCompressedIndex(_isFileCompressed, index);
+      // TODO: not clear why this check and skipSeek are needed.
       if (_dataStream.available() > 0) {
-        if (_isFileCompressed) {
-          index.getNext();
-        }
         stream.seek(index);
+      } else {
+        assert stream == _dataStream;
+        skipSeek(index);
       }
 
+      skipCompressedIndex(_isFileCompressed, index);
       if (lengths != null && _lengthsStream.available() > 0) {
-        if (_isFileCompressed) {
-          index.getNext();
-        }
         lengths.seek(index);
-      }
+      } // No need to skip seek here, index won't be used anymore.
     }
 
     @Override
@@ -2132,6 +2132,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
   }
 
 
+  private static void skipSeek(PositionProvider index) {
+    // Must be consistent with uncompressed stream seek in ORC. See call site comments.
+    index.getNext();
+  }
+
+
   private static TreeReader createEncodedTreeReader(TypeDescription schema,
       List<OrcProto.ColumnEncoding> encodings, OrcEncodedColumnBatch batch,
       CompressionCodec codec, TreeReaderFactory.Context context) throws IOException {