You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/08/14 23:22:03 UTC

hive git commit: HIVE-11546: Projected columns read size should be scaled to split size for ORC Splits (Prasanth Jayachandran reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master cf0481fcf -> e8b2c605a


HIVE-11546: Projected columns read size should be scaled to split size for ORC Splits (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e8b2c605
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e8b2c605
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e8b2c605

Branch: refs/heads/master
Commit: e8b2c605a05d06c1bfcd4c6bc611bc7f83306b38
Parents: cf0481f
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Fri Aug 14 14:21:51 2015 -0700
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Fri Aug 14 14:21:51 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 30 ++++++-
 .../apache/hadoop/hive/ql/io/orc/Reader.java    |  7 ++
 .../hadoop/hive/ql/io/orc/ReaderImpl.java       |  5 +-
 .../hive/ql/io/orc/TestInputOutputFormat.java   | 95 ++++++++++++++++++--
 4 files changed, 125 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e8b2c605/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index fe2eccd..6ed7872 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -717,6 +717,7 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     private ReaderImpl.FileMetaInfo fileMetaInfo;
     private Metadata metadata;
     private List<OrcProto.Type> types;
+    private boolean[] includedCols;
     private final boolean isOriginal;
     private final List<DeltaMetaData> deltas;
     private final boolean hasBase;
@@ -830,8 +831,14 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
         hosts = new String[hostList.size()];
         hostList.toArray(hosts);
       }
+
+      // scale the raw data size to split level based on ratio of split wrt to file length
+      final long fileLen = file.getLen();
+      final double splitRatio = (double) length / (double) fileLen;
+      final long scaledProjSize = projColsUncompressedSize > 0 ?
+          (long) (splitRatio * projColsUncompressedSize) : fileLen;
       return new OrcSplit(file.getPath(), offset, length, hosts, fileMetaInfo,
-          isOriginal, hasBase, deltas, projColsUncompressedSize);
+          isOriginal, hasBase, deltas, scaledProjSize);
     }
 
     /**
@@ -845,11 +852,12 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
 
       // figure out which stripes we need to read
       boolean[] includeStripe = null;
+
       // we can't eliminate stripes if there are deltas because the
       // deltas may change the rows making them match the predicate.
       if (deltas.isEmpty()) {
         Reader.Options options = new Reader.Options();
-        options.include(genIncludedColumns(types, context.conf, isOriginal));
+        options.include(includedCols);
         setSearchArgument(options, types, context.conf, isOriginal);
         // only do split pruning if HIVE-8732 has been fixed in the writer
         if (options.getSearchArgument() != null &&
@@ -930,8 +938,6 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     private void populateAndCacheStripeDetails() throws IOException {
       Reader orcReader = OrcFile.createReader(file.getPath(),
           OrcFile.readerOptions(context.conf).filesystem(fs));
-      List<String> projCols = ColumnProjectionUtils.getReadColumnNames(context.conf);
-      // TODO: produce projColsUncompressedSize from projCols
       if (fileInfo != null) {
         stripes = fileInfo.stripeInfos;
         fileMetaInfo = fileInfo.fileMetaInfo;
@@ -959,6 +965,22 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
                   metadata, types, fileMetaInfo, writerVersion));
         }
       }
+      includedCols = genIncludedColumns(types, context.conf, isOriginal);
+      projColsUncompressedSize = computeProjectionSize(orcReader, includedCols, isOriginal);
+    }
+
+    private long computeProjectionSize(final Reader orcReader, final boolean[] includedCols,
+        final boolean isOriginal) {
+      final int rootIdx = getRootColumn(isOriginal);
+      List<Integer> internalColIds = Lists.newArrayList();
+      if (includedCols != null) {
+        for (int i = 0; i < includedCols.length; i++) {
+          if (includedCols[i]) {
+            internalColIds.add(rootIdx + i);
+          }
+        }
+      }
+      return orcReader.getRawDataSizeFromColIndices(internalColIds);
     }
 
     private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics,

http://git-wip-us.apache.org/repos/asf/hive/blob/e8b2c605/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
index 6f4f013..7bddefc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
@@ -52,6 +52,13 @@ public interface Reader {
   long getRawDataSizeOfColumns(List<String> colNames);
 
   /**
+   * Get the deserialized data size of the specified columns ids
+   * @param colIds - internal column id (check orcfiledump for column ids)
+   * @return raw data size of columns
+   */
+  long getRawDataSizeFromColIndices(List<Integer> colIds);
+
+  /**
    * Get the user metadata keys.
    * @return the set of metadata keys
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/e8b2c605/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index 3c0de3c..a6448b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -575,7 +575,8 @@ public class ReaderImpl implements Reader {
     return deserializedSize;
   }
 
-  private long getRawDataSizeFromColIndices(List<Integer> colIndices) {
+  @Override
+  public long getRawDataSizeFromColIndices(List<Integer> colIndices) {
     long result = 0;
     for (int colIdx : colIndices) {
       result += getRawDataSizeOfColumn(colIdx);
@@ -620,7 +621,7 @@ public class ReaderImpl implements Reader {
     case BYTE:
       return numVals * JavaDataModel.get().primitive1();
     default:
-      LOG.debug("Unknown primitive category.");
+      LOG.debug("Unknown primitive category: " + type.getKind());
       break;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e8b2c605/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 6cb8529..0c12c89 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Output;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -108,6 +105,9 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+
 public class TestInputOutputFormat {
 
   public static String toKryo(SearchArgument sarg) {
@@ -902,14 +902,25 @@ public class TestInputOutputFormat {
     }
     fill(buffer, offset);
     footer.addTypes(OrcProto.Type.newBuilder()
-                     .setKind(OrcProto.Type.Kind.STRUCT)
-                     .addFieldNames("col1")
-                     .addSubtypes(1));
+        .setKind(OrcProto.Type.Kind.STRUCT)
+        .addFieldNames("col1")
+        .addSubtypes(1));
     footer.addTypes(OrcProto.Type.newBuilder()
         .setKind(OrcProto.Type.Kind.STRING));
     footer.setNumberOfRows(1000 * stripeLengths.length)
           .setHeaderLength(headerLen)
           .setContentLength(offset - headerLen);
+    footer.addStatistics(OrcProto.ColumnStatistics.newBuilder()
+        .setNumberOfValues(1000 * stripeLengths.length).build());
+    footer.addStatistics(OrcProto.ColumnStatistics.newBuilder()
+        .setNumberOfValues(1000 * stripeLengths.length)
+        .setStringStatistics(
+            OrcProto.StringStatistics.newBuilder()
+                .setMaximum("zzz")
+                .setMinimum("aaa")
+                .setSum(1000 * 3 * stripeLengths.length)
+                .build()
+        ).build());
     footer.build().writeTo(buffer);
     int footerEnd = buffer.getLength();
     OrcProto.PostScript ps =
@@ -1013,6 +1024,78 @@ public class TestInputOutputFormat {
   }
 
   @Test
+  public void testProjectedColumnSize() throws Exception {
+    long[] stripeSizes =
+        new long[]{200, 200, 200, 200, 100};
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/a/file", 500,
+            createMockOrcFile(stripeSizes),
+            new MockBlock("host1-1", "host1-2", "host1-3"),
+            new MockBlock("host2-1", "host0", "host2-3"),
+            new MockBlock("host0", "host3-2", "host3-3"),
+            new MockBlock("host4-1", "host4-2", "host4-3"),
+            new MockBlock("host5-1", "host5-2", "host5-3")));
+    conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 300);
+    conf.setInt(OrcInputFormat.MIN_SPLIT_SIZE, 200);
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
+    OrcInputFormat.SplitGenerator splitter =
+        new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
+            fs.getFileStatus(new Path("/a/file")), null, true,
+            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
+    List<OrcSplit> results = splitter.call();
+    OrcSplit result = results.get(0);
+    assertEquals(3, results.size());
+    assertEquals(3, result.getStart());
+    assertEquals(400, result.getLength());
+    assertEquals(167468, result.getProjectedColumnsUncompressedSize());
+    result = results.get(1);
+    assertEquals(403, result.getStart());
+    assertEquals(400, result.getLength());
+    assertEquals(167468, result.getProjectedColumnsUncompressedSize());
+    result = results.get(2);
+    assertEquals(803, result.getStart());
+    assertEquals(100, result.getLength());
+    assertEquals(41867, result.getProjectedColumnsUncompressedSize());
+
+    // test min = 0, max = 0 generates each stripe
+    conf.setInt(OrcInputFormat.MIN_SPLIT_SIZE, 0);
+    conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 0);
+    context = new OrcInputFormat.Context(conf);
+    splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
+        fs.getFileStatus(new Path("/a/file")), null, true,
+        new ArrayList<AcidInputFormat.DeltaMetaData>(),
+        true, null, null));
+    results = splitter.call();
+    assertEquals(5, results.size());
+    for (int i = 0; i < stripeSizes.length; ++i) {
+      assertEquals("checking stripe " + i + " size",
+          stripeSizes[i], results.get(i).getLength());
+      if (i == stripeSizes.length - 1) {
+        assertEquals(41867, results.get(i).getProjectedColumnsUncompressedSize());
+      } else {
+        assertEquals(83734, results.get(i).getProjectedColumnsUncompressedSize());
+      }
+    }
+
+    // single split
+    conf.setInt(OrcInputFormat.MIN_SPLIT_SIZE, 100000);
+    conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 1000);
+    context = new OrcInputFormat.Context(conf);
+    splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
+        fs.getFileStatus(new Path("/a/file")), null, true,
+        new ArrayList<AcidInputFormat.DeltaMetaData>(),
+        true, null, null));
+    results = splitter.call();
+    assertEquals(1, results.size());
+    result = results.get(0);
+    assertEquals(3, result.getStart());
+    assertEquals(900, result.getLength());
+    assertEquals(376804, result.getProjectedColumnsUncompressedSize());
+  }
+
+  @Test
   @SuppressWarnings("unchecked,deprecation")
   public void testInOutFormat() throws Exception {
     Properties properties = new Properties();