You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2018/10/13 23:52:10 UTC
[1/2] hive git commit: HIVE-20719: SELECT statement fails after
UPDATE with hive.optimize.sort.dynamic.partition optimization and
vectorization on (Eugene Koifman, reviewed by Gopal V)
Repository: hive
Updated Branches:
refs/heads/master 87414f37e -> 35278429d
HIVE-20719: SELECT statement fails after UPDATE with hive.optimize.sort.dynamic.partition optimization and vectorization on (Eugene Koifman, reviewed by Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3c6a36b9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3c6a36b9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3c6a36b9
Branch: refs/heads/master
Commit: 3c6a36b99d609b1baf5023fc5e801f74486bbb54
Parents: 87414f3
Author: Eugene Koifman <ek...@apache.org>
Authored: Sat Oct 13 16:38:13 2018 -0700
Committer: Eugene Koifman <ek...@apache.org>
Committed: Sat Oct 13 16:38:13 2018 -0700
----------------------------------------------------------------------
.../optimizer/SortedDynPartitionOptimizer.java | 5 +-
.../apache/hadoop/hive/ql/TestTxnCommands3.java | 61 ++++++++++++++++++++
2 files changed, 64 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3c6a36b9/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
index 2dc2351..314b8b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -208,8 +209,8 @@ public class SortedDynPartitionOptimizer extends Transform {
if(!VirtualColumn.ROWID.getTypeInfo().equals(ci.getType())) {
throw new IllegalStateException("expected 1st column to be ROW__ID but got wrong type: " + ci.toString());
}
- //HIVE-17328: not sure this is correct... I don't think is gets wrapped in UDFToInteger....
- bucketColumns.add(new ExprNodeColumnDesc(ci));
+ //add a cast(ROW__ID as int) to wrap in UDFToInteger()
+ bucketColumns.add(ParseUtils.createConversionCast(new ExprNodeColumnDesc(ci), TypeInfoFactory.intTypeInfo));
} else {
if (!destTable.getSortCols().isEmpty()) {
// Sort columns specified by table
http://git-wip-us.apache.org/repos/asf/hive/blob/3c6a36b9/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index a25406d..833e637 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -222,4 +222,65 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
rs = runStatementOnDriver("select a, b from T order by a, b");
Assert.assertEquals(stringifyValues(dataAll), rs);
}
+
+ /**
+ * Test that rows are routed to proper files based on bucket col/ROW__ID
+ * Only the Vectorized Acid Reader checks if bucketId in ROW__ID inside the file
+ * matches the file name and only for files in delete_delta
+ */
+ @Test
+ public void testSdpoBucketed() throws Exception {
+ testSdpoBucketed(true, true, 1);
+ testSdpoBucketed(true, false, 1);
+ testSdpoBucketed(false, true, 1);
+ testSdpoBucketed(false, false,1);
+
+ testSdpoBucketed(true, true, 2);
+ testSdpoBucketed(true, false, 2);
+ testSdpoBucketed(false, true, 2);
+ testSdpoBucketed(false, false,2);
+ }
+ private void testSdpoBucketed(boolean isVectorized, boolean isSdpo, int bucketing_version)
+ throws Exception {
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITION, isSdpo);
+ runStatementOnDriver("drop table if exists acid_uap");
+ runStatementOnDriver("create transactional table acid_uap(a int, b varchar(128)) " +
+ "partitioned by (ds string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES " +
+ "('bucketing_version'='" + bucketing_version + "')");
+ runStatementOnDriver("insert into table acid_uap partition (ds='tomorrow') " +
+ "values (1, 'bah'),(2, 'yah')");
+ runStatementOnDriver("insert into table acid_uap partition (ds='today') " +
+ "values (1, 'bah'),(2, 'yah')");
+ runStatementOnDriver("select a,b, ds from acid_uap order by a,b, ds");
+
+ String testQuery = isVectorized ?
+ "select ROW__ID, a, b, ds from acid_uap order by ds, a, b" :
+ "select ROW__ID, a, b, ds, INPUT__FILE__NAME from acid_uap order by ds, a, b";
+ String[][] expected = new String[][]{
+ {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\tbah\ttoday",
+ "warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00001"},
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t2\tyah\ttoday",
+ "warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00000"},
+
+ {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\tbah\ttomorrow",
+ "warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00001"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\tyah\ttomorrow",
+ "warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00000"}};
+ checkResult(expected, testQuery, isVectorized, "after insert", LOG);
+
+ runStatementOnDriver("update acid_uap set b = 'fred'");
+
+ String[][] expected2 = new String[][]{
+ {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t1\tfred\ttoday",
+ "warehouse/acid_uap/ds=today/delta_0000003_0000003_0000/bucket_00001"},
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\tfred\ttoday",
+ "warehouse/acid_uap/ds=today/delta_0000003_0000003_0000/bucket_00000"},
+
+ {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t1\tfred\ttomorrow",
+ "warehouse/acid_uap/ds=tomorrow/delta_0000003_0000003_0000/bucket_00001"},
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\tfred\ttomorrow",
+ "warehouse/acid_uap/ds=tomorrow/delta_0000003_0000003_0000/bucket_00000"}};
+ checkResult(expected2, testQuery, isVectorized, "after update", LOG);
+ }
}
[2/2] hive git commit: HIVE-20723: Allow per table specification of
compaction yarn queue (Saurabh Seth via Eugene Koifman)
Posted by ek...@apache.org.
HIVE-20723: Allow per table specification of compaction yarn queue (Saurabh Seth via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/35278429
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/35278429
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/35278429
Branch: refs/heads/master
Commit: 35278429d9677b0878a4523ed7b03a5016f81e1d
Parents: 3c6a36b
Author: Saurabh Seth <sa...@gmail.com>
Authored: Sat Oct 13 16:46:16 2018 -0700
Committer: Eugene Koifman <ek...@apache.org>
Committed: Sat Oct 13 16:46:16 2018 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/txn/compactor/TestCompactor.java | 10 ++++++++--
.../apache/hadoop/hive/ql/txn/compactor/CompactorMR.java | 11 ++++++-----
2 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/35278429/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index cffa21a..a9d7468 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1584,6 +1584,7 @@ public class TestCompactor {
*/
@Test
public void testTableProperties() throws Exception {
+ conf.setVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE, "root.user1");
String tblName1 = "ttp1"; // plain acid table
String tblName2 = "ttp2"; // acid table with customized tblproperties
executeStatementOnDriver("drop table if exists " + tblName1, driver);
@@ -1596,7 +1597,8 @@ public class TestCompactor {
"'transactional'='true'," +
"'compactor.mapreduce.map.memory.mb'='2048'," + // 2048 MB memory for compaction map job
"'compactorthreshold.hive.compactor.delta.num.threshold'='4'," + // minor compaction if more than 4 delta dirs
- "'compactorthreshold.hive.compactor.delta.pct.threshold'='0.47'" + // major compaction if more than 47%
+ "'compactorthreshold.hive.compactor.delta.pct.threshold'='0.47'," + // major compaction if more than 47%
+ "'compactor.hive.compactor.job.queue'='root.user2'" + // Override the system wide compactor queue for this table
")", driver);
// Insert 5 rows to both tables
@@ -1641,6 +1643,7 @@ public class TestCompactor {
t.run();
JobConf job = t.getMrJob();
Assert.assertEquals(2048, job.getMemoryForMapTask()); // 2048 comes from tblproperties
+ Assert.assertEquals("root.user2", job.getQueueName()); // Queue name comes from table properties
// Compact ttp1
stop = new AtomicBoolean(true);
t = new Worker();
@@ -1651,6 +1654,7 @@ public class TestCompactor {
t.run();
job = t.getMrJob();
Assert.assertEquals(1024, job.getMemoryForMapTask()); // 1024 is the default value
+ Assert.assertEquals("root.user1", job.getQueueName()); // The system wide compaction queue name
// Clean up
runCleaner(conf);
rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -1702,7 +1706,8 @@ public class TestCompactor {
executeStatementOnDriver("alter table " + tblName2 + " compact 'major'" +
" with overwrite tblproperties (" +
"'compactor.mapreduce.map.memory.mb'='3072'," +
- "'tblprops.orc.compress.size'='3141')", driver);
+ "'tblprops.orc.compress.size'='3141'," +
+ "'compactor.hive.compactor.job.queue'='root.user2')", driver);
rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(4, rsp.getCompacts().size());
@@ -1722,6 +1727,7 @@ public class TestCompactor {
job = t.getMrJob();
Assert.assertEquals(3072, job.getMemoryForMapTask());
Assert.assertTrue(job.get("hive.compactor.table.props").contains("orc.compress.size4:3141"));
+ Assert.assertEquals("root.user2", job.getQueueName());
/*createReader(FileSystem fs, Path path) throws IOException {
*/
//we just ran Major compaction so we should have a base_x in tblName2 that has the new files
http://git-wip-us.apache.org/repos/asf/hive/blob/35278429/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 611f85a..92c74e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -150,11 +150,6 @@ public class CompactorMR {
job.setOutputFormat(NullOutputFormat.class);
job.setOutputCommitter(CompactorOutputCommitter.class);
- String queueName = conf.getVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE);
- if(queueName != null && queueName.length() > 0) {
- job.setQueueName(queueName);
- }
-
job.set(FINAL_LOCATION, sd.getLocation());
job.set(TMP_LOCATION, generateTmpPath(sd));
job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat());
@@ -167,6 +162,12 @@ public class CompactorMR {
if (ci.properties != null) {
overrideTblProps(job, t.getParameters(), ci.properties);
}
+
+ String queueName = HiveConf.getVar(job, ConfVars.COMPACTOR_JOB_QUEUE);
+ if (queueName != null && queueName.length() > 0) {
+ job.setQueueName(queueName);
+ }
+
setColumnTypes(job, sd.getCols());
//with feature on, multiple tasks may get into conflict creating/using TMP_LOCATION and if we were
//to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter