You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/09/09 09:08:45 UTC
[15/50] [abbrv] hive git commit: HIVE-11357 ACID enable predicate
pushdown for insert-only delta file 2 (Eugene Koifman, reviewed by Alan Gates)
HIVE-11357 ACID enable predicate pushdown for insert-only delta file 2 (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ed4517cf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ed4517cf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ed4517cf
Branch: refs/heads/beeline-cli
Commit: ed4517cfb14b90d03f3cf33d653827bec90bcb98
Parents: 8e712da
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Aug 28 12:19:32 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Aug 28 12:19:32 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 18 ++++-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 85 ++++++++++++++++----
2 files changed, 88 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ed4517cf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index fd6d2ad..8c138b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -127,7 +127,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
/**
* When picking the hosts for a split that crosses block boundaries,
- * any drop any host that has fewer than MIN_INCLUDED_LOCATION of the
+ * drop any host that has fewer than MIN_INCLUDED_LOCATION of the
* number of bytes available on the host with the most.
* If host1 has 10MB of the split, host2 has 20MB, and host3 has 18MB the
* split will contain host2 (100% of host2) and host3 (90% of host2). Host1
@@ -1283,6 +1283,22 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
} else {
bucket = (int) split.getStart();
reader = null;
+ if(deltas != null && deltas.length > 0) {
+ Path bucketPath = AcidUtils.createBucketFile(deltas[0], bucket);
+ OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
+ FileSystem fs = readerOptions.getFilesystem();
+ if(fs == null) {
+ fs = path.getFileSystem(options.getConfiguration());
+ }
+ if(fs.exists(bucketPath)) {
+ /* w/o schema evolution (which ACID doesn't support yet) all delta
+ files have the same schema, so choosing the 1st one*/
+ final List<OrcProto.Type> types =
+ OrcFile.createReader(bucketPath, readerOptions).getTypes();
+ readOptions.include(genIncludedColumns(types, conf, split.isOriginal()));
+ setSearchArgument(readOptions, types, conf, split.isOriginal());
+ }
+ }
}
String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY,
Long.MAX_VALUE + ":");
http://git-wip-us.apache.org/repos/asf/hive/blob/ed4517cf/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 58c2fca..5aa2500 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.Worker;
@@ -51,7 +52,7 @@ public class TestTxnCommands2 {
).getPath().replaceAll("\\\\", "/");
private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
//bucket count for test tables; set it to 1 for easier debugging
- private static int BUCKET_COUNT = 1;
+ private static int BUCKET_COUNT = 2;
@Rule
public TestName testName = new TestName();
private HiveConf hiveConf;
@@ -107,7 +108,6 @@ public class TestTxnCommands2 {
public void tearDown() throws Exception {
try {
if (d != null) {
- // runStatementOnDriver("set autocommit true");
dropTables();
d.destroy();
d.close();
@@ -126,13 +126,51 @@ public class TestTxnCommands2 {
public void testOrcNoPPD() throws Exception {
testOrcPPD(false);
}
- private void testOrcPPD(boolean enablePPD) throws Exception {
+
+ /**
+ * this is run 2 times: 1 with PPD on, 1 with off
+ * Also, the queries are such that if we were to push predicate down to an update/delete delta,
+ * the test would produce wrong results
+ * @param enablePPD
+ * @throws Exception
+ */
+ private void testOrcPPD(boolean enablePPD) throws Exception {
boolean originalPpd = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, enablePPD);//enables ORC PPD
- int[][] tableData = {{1,2},{3,4}};
- runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
- List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b");
- runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+ //create delta_0001_0001_0000 (should push predicate here)
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(new int[][]{{1, 2}, {3, 4}}));
+ List<String> explain;
+ String query = "update " + Table.ACIDTBL + " set b = 5 where a = 3";
+ if (enablePPD) {
+ explain = runStatementOnDriver("explain " + query);
+ /*
+ here is a portion of the above "explain". The "filterExpr:" in the TableScan is the pushed predicate
+ w/o PPD, the line is simply not there, otherwise the plan is the same
+ Map Operator Tree:,
+ TableScan,
+ alias: acidtbl,
+ filterExpr: (a = 3) (type: boolean),
+ Filter Operator,
+ predicate: (a = 3) (type: boolean),
+ Select Operator,
+ ...
+ */
+ assertPredicateIsPushed("filterExpr: (a = 3)", explain);
+ }
+ //create delta_0002_0002_0000 (can't push predicate)
+ runStatementOnDriver(query);
+ query = "select a,b from " + Table.ACIDTBL + " where b = 4 order by a,b";
+ if (enablePPD) {
+ /*at this point we have 2 delta files, 1 for insert 1 for update
+ * we should push predicate into 1st one but not 2nd. If the following 'select' were to
+ * push into the 'update' delta, we'd filter out {3,5} before doing merge and thus
+ * produce {3,4} as the value for 2nd row. The right result is 0-rows.*/
+ explain = runStatementOnDriver("explain " + query);
+ assertPredicateIsPushed("filterExpr: (b = 4)", explain);
+ }
+ List<String> rs0 = runStatementOnDriver(query);
+ Assert.assertEquals("Read failed", 0, rs0.size());
+ runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'MAJOR'");
Worker t = new Worker();
t.setThreadId((int) t.getId());
t.setHiveConf(hiveConf);
@@ -142,18 +180,37 @@ public class TestTxnCommands2 {
t.init(stop, looped);
t.run();
//now we have base_0001 file
- int[][] tableData2 = {{1,7},{5,6},{7,8},{9,10}};
+ int[][] tableData2 = {{1, 7}, {5, 6}, {7, 8}, {9, 10}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
- //now we have delta_0002_0002_0000 with inserts only (ok to push predicate)
+ //now we have delta_0003_0003_0000 with inserts only (ok to push predicate)
+ if (enablePPD) {
+ explain = runStatementOnDriver("explain delete from " + Table.ACIDTBL + " where a=7 and b=8");
+ assertPredicateIsPushed("filterExpr: ((a = 7) and (b = 8))", explain);
+ }
runStatementOnDriver("delete from " + Table.ACIDTBL + " where a=7 and b=8");
- //now we have delta_0003_0003_0000 with delete events (can't push predicate)
- runStatementOnDriver("update " + Table.ACIDTBL + " set b = 11 where a = 9");
- //and another delta with update op
- List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b");
- int [][] resultData = {{3,4},{5,6},{9,11}};
+ //now we have delta_0004_0004_0000 with delete events
+
+ /*(can't push predicate to 'delete' delta)
+ * if we were to push to 'delete' delta, we'd filter out all rows since the 'row' is always NULL for
+ * delete events and we'd produce data as if the delete never happened*/
+ query = "select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b";
+ if(enablePPD) {
+ explain = runStatementOnDriver("explain " + query);
+ assertPredicateIsPushed("filterExpr: (a > 1)", explain);
+ }
+ List<String> rs1 = runStatementOnDriver(query);
+ int [][] resultData = new int[][] {{3, 5}, {5, 6}, {9, 10}};
Assert.assertEquals("Update failed", stringifyValues(resultData), rs1);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, originalPpd);
}
+ private static void assertPredicateIsPushed(String ppd, List<String> queryPlan) {
+ for(String line : queryPlan) {
+ if(line != null && line.contains(ppd)) {
+ return;
+ }
+ }
+ Assert.assertFalse("PPD '" + ppd + "' wasn't pushed", true);
+ }
@Ignore("alter table")
@Test
public void testAlterTable() throws Exception {