You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/07/31 02:43:06 UTC

[06/43] hive git commit: HIVE-11320 ACID enable predicate pushdown for insert-only delta file (Eugene Koifman, reviewed by Alan Gates)

HIVE-11320 ACID enable predicate pushdown for insert-only delta file (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/99041624
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/99041624
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/99041624

Branch: refs/heads/spark
Commit: 990416249833e722ca8a32dd9dd425883da0caaf
Parents: 6ec72de
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Jul 21 11:42:14 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Jul 21 11:42:14 2015 -0700

----------------------------------------------------------------------
 .../hive/ql/io/orc/OrcRawRecordMerger.java      | 20 ++++--
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 68 ++++++++++++++++++--
 2 files changed, 75 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/99041624/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 2f11611..58b85ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -478,10 +478,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
     // we always want to read all of the deltas
     eventOptions.range(0, Long.MAX_VALUE);
-    // Turn off the sarg before pushing it to delta.  We never want to push a sarg to a delta as
-    // it can produce wrong results (if the latest valid version of the record is filtered out by
-    // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record)
-    eventOptions.searchArgument(null, null);
     if (deltaDirectory != null) {
       for(Path delta: deltaDirectory) {
         ReaderKey key = new ReaderKey();
@@ -492,8 +488,20 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
         if (length != -1 && fs.exists(deltaFile)) {
           Reader deltaReader = OrcFile.createReader(deltaFile,
               OrcFile.readerOptions(conf).maxLength(length));
-          ReaderPair deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
-            maxKey, eventOptions, deltaDir.getStatementId());
+          Reader.Options deltaEventOptions = null;
+          if(eventOptions.getSearchArgument() != null) {
+            // Turn off the sarg before pushing it to delta.  We never want to push a sarg to a delta as
+            // it can produce wrong results (if the latest valid version of the record is filtered out by
+            // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record)
+            // unless the delta only has insert events
+            OrcRecordUpdater.AcidStats acidStats = OrcRecordUpdater.parseAcidStats(deltaReader);
+            if(acidStats.deletes > 0 || acidStats.updates > 0) {
+              deltaEventOptions = eventOptions.clone().searchArgument(null, null);
+            }
+          }
+          ReaderPair deltaPair;
+          deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
+            maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId());
           if (deltaPair.nextRecord != null) {
             readers.put(key, deltaPair);
           }

http://git-wip-us.apache.org/repos/asf/hive/blob/99041624/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 33ca998..57e4fb9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -20,13 +20,11 @@ package org.apache.hadoop.hive.ql;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.orc.FileDump;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -36,13 +34,11 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * TODO: this should be merged with TestTxnCommands once that is checked in
@@ -55,7 +51,7 @@ public class TestTxnCommands2 {
   ).getPath().replaceAll("\\\\", "/");
   private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
   //bucket count for test tables; set it to 1 for easier debugging
-  private static int BUCKET_COUNT = 2;
+  private static int BUCKET_COUNT = 1;
   @Rule
   public TestName testName = new TestName();
   private HiveConf hiveConf;
@@ -122,6 +118,64 @@ public class TestTxnCommands2 {
       FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
     }
   }
+  @Test
+  public void testOrcPPD() throws Exception  {
+    testOrcPPD(true);
+  }
+  @Test
+  public void testOrcNoPPD() throws Exception {
+    testOrcPPD(false);
+  }
+  private void testOrcPPD(boolean enablePPD) throws Exception  {
+    boolean originalPpd = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, enablePPD);//enables ORC PPD
+    int[][] tableData = {{1,2},{3,4}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+    runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setHiveConf(hiveConf);
+    AtomicBoolean stop = new AtomicBoolean();
+    AtomicBoolean looped = new AtomicBoolean();
+    stop.set(true);
+    t.init(stop, looped);
+    t.run();
+    //now we have base_0001 file
+    int[][] tableData2 = {{1,7},{5,6},{7,8},{9,10}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
+    //now we have delta_0002_0002_0000 with inserts only (ok to push predicate)
+    runStatementOnDriver("delete from " + Table.ACIDTBL + " where a=7 and b=8");
+    //now we have delta_0003_0003_0000 with delete events (can't push predicate)
+    runStatementOnDriver("update " + Table.ACIDTBL + " set b = 11 where a = 9");
+    //and another delta with update op
+    List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b");
+    int [][] resultData = {{3,4},{5,6},{9,11}};
+    Assert.assertEquals("Update failed", stringifyValues(resultData), rs1);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, originalPpd);
+  }
+  @Ignore("alter table")
+  @Test
+  public void testAlterTable() throws Exception {
+    int[][] tableData = {{1,2}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+    runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setHiveConf(hiveConf);
+    AtomicBoolean stop = new AtomicBoolean();
+    AtomicBoolean looped = new AtomicBoolean();
+    stop.set(true);
+    t.init(stop, looped);
+    t.run();
+    int[][] tableData2 = {{5,6}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
+    List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where b > 0 order by a,b");
+
+    runStatementOnDriver("alter table " + Table.ACIDTBL + " add columns(c int)");
+    int[][] moreTableData = {{7,8,9}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b,c) " + makeValuesClause(moreTableData));
+    List<String> rs0 = runStatementOnDriver("select a,b,c from " + Table.ACIDTBL + " where a > 0 order by a,b,c");
+  }
   @Ignore("not needed but useful for testing")
   @Test
   public void testNonAcidInsert() throws Exception {