You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/10/01 01:08:53 UTC
[11/44] hive git commit: HIVE-14783 : bucketing column should be part
of sorting for delete/update operation when spdo is on (Ashutosh Chauhan via
Prasanth J) Addendum patch
HIVE-14783 : bucketing column should be part of sorting for delete/update operation when spdo is on (Ashutosh Chauhan via Prasanth J)
Addendum patch
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ed82cfa9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ed82cfa9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ed82cfa9
Branch: refs/heads/hive-14535
Commit: ed82cfa914769cfabfc7460b7b5abbdae71e562a
Parents: 91082e5
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Wed Sep 21 15:18:37 2016 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Wed Sep 21 15:19:18 2016 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/FileSinkOperator.java | 10 +++++-----
.../hadoop/hive/ql/exec/ReduceSinkOperator.java | 9 +++++++++
.../optimizer/SortedDynPartitionOptimizer.java | 4 ++--
.../dynpart_sort_optimization_acid.q.out | 20 ++++++++++----------
4 files changed, 26 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ed82cfa9/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index e386717..eeba6cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -766,19 +766,19 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
if (fpaths.acidLastBucket != bucketNum) {
fpaths.acidLastBucket = bucketNum;
// Switch files
- fpaths.updaters[++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
- jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[fpaths.acidFileOffset],
+ fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 : ++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
+ jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset],
rowInspector, reporter, 0);
if (isDebugEnabled) {
LOG.debug("Created updater for bucket number " + bucketNum + " using file " +
- fpaths.outPaths[fpaths.acidFileOffset]);
+ fpaths.outPaths[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset]);
}
}
if (conf.getWriteType() == AcidUtils.Operation.UPDATE) {
- fpaths.updaters[fpaths.acidFileOffset].update(conf.getTransactionId(), row);
+ fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset].update(conf.getTransactionId(), row);
} else if (conf.getWriteType() == AcidUtils.Operation.DELETE) {
- fpaths.updaters[fpaths.acidFileOffset].delete(conf.getTransactionId(), row);
+ fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset].delete(conf.getTransactionId(), row);
} else {
throw new HiveException("Unknown write type " + conf.getWriteType().toString());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed82cfa9/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index a9885d8..4eea6b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -78,6 +80,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
private transient ObjectInspector[] partitionObjectInspectors;
private transient ObjectInspector[] bucketObjectInspectors;
private transient int buckColIdxInKey;
+ private transient int buckColIdxInKeyForAcid = -1;
private boolean firstRow;
private transient int tag;
private boolean skipTag = false;
@@ -183,6 +186,9 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
keyEval = new ExprNodeEvaluator[keys.size()];
int i = 0;
for (ExprNodeDesc e : keys) {
+ if (e instanceof ExprNodeConstantDesc && ("_bucket_number").equals(((ExprNodeConstantDesc)e).getValue())) {
+ buckColIdxInKeyForAcid = i;
+ }
keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
}
@@ -359,6 +365,9 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
// In the non-partitioned case we still want to compute the bucket number for updates and
// deletes.
bucketNumber = computeBucketNumber(row, conf.getNumBuckets());
+ if (buckColIdxInKeyForAcid != -1) {
+ cachedKeys[0][buckColIdxInKeyForAcid] = new Text(String.valueOf(bucketNumber));
+ }
}
HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
http://git-wip-us.apache.org/repos/asf/hive/blob/ed82cfa9/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
index 8b4af72..926386b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
@@ -247,7 +247,7 @@ public class SortedDynPartitionOptimizer extends Transform {
}
}
RowSchema selRS = new RowSchema(fsParent.getSchema());
- if (!bucketColumns.isEmpty()) {
+ if (!bucketColumns.isEmpty() || fsOp.getConf().getWriteType() == Operation.DELETE || fsOp.getConf().getWriteType() == Operation.UPDATE) {
descs.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, ReduceField.KEY.toString()+".'"+BUCKET_NUMBER_COL_NAME+"'", null, false));
colNames.add("'"+BUCKET_NUMBER_COL_NAME+"'");
ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo, selRS.getSignature().get(0).getTabAlias(), true, true);
@@ -268,7 +268,7 @@ public class SortedDynPartitionOptimizer extends Transform {
// Set if partition sorted or partition bucket sorted
fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_SORTED);
- if (bucketColumns.size() > 0) {
+ if (bucketColumns.size() > 0 || fsOp.getConf().getWriteType() == Operation.DELETE || fsOp.getConf().getWriteType() == Operation.UPDATE) {
fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_BUCKET_SORTED);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed82cfa9/ql/src/test/results/clientpositive/dynpart_sort_optimization_acid.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/dynpart_sort_optimization_acid.q.out b/ql/src/test/results/clientpositive/dynpart_sort_optimization_acid.q.out
index 1838d6a..111ce18 100644
--- a/ql/src/test/results/clientpositive/dynpart_sort_optimization_acid.q.out
+++ b/ql/src/test/results/clientpositive/dynpart_sort_optimization_acid.q.out
@@ -422,8 +422,8 @@ STAGE PLANS:
Statistics: Num rows: 892 Data size: 2676 Basic stats: COMPLETE Column stats: NONE
Reduce Operator Tree:
Select Operator
- expressions: KEY._col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), 'foo' (type: string), 'bar' (type: string), KEY._col3 (type: string)
- outputColumnNames: _col0, _col1, _col2, _col3
+ expressions: KEY._col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), 'foo' (type: string), 'bar' (type: string), KEY._col3 (type: string), KEY.'_bucket_number' (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, '_bucket_number'
Statistics: Num rows: 892 Data size: 2676 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
@@ -1042,8 +1042,8 @@ STAGE PLANS:
Statistics: Num rows: 1517 Data size: 4551 Basic stats: COMPLETE Column stats: NONE
Reduce Operator Tree:
Select Operator
- expressions: KEY._col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), 'foo' (type: string), 'bar' (type: string), '2008-04-08' (type: string), KEY._col4 (type: int)
- outputColumnNames: _col0, _col1, _col2, _col3, _col4
+ expressions: KEY._col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), 'foo' (type: string), 'bar' (type: string), '2008-04-08' (type: string), KEY._col4 (type: int), KEY.'_bucket_number' (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, '_bucket_number'
Statistics: Num rows: 1517 Data size: 4551 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
@@ -1152,8 +1152,8 @@ STAGE PLANS:
Statistics: Num rows: 2979 Data size: 8937 Basic stats: COMPLETE Column stats: NONE
Reduce Operator Tree:
Select Operator
- expressions: KEY._col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), KEY._col1 (type: string), KEY._col2 (type: int)
- outputColumnNames: _col0, _col1, _col2
+ expressions: KEY._col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), KEY._col1 (type: string), KEY._col2 (type: int), KEY.'_bucket_number' (type: string)
+ outputColumnNames: _col0, _col1, _col2, '_bucket_number'
Statistics: Num rows: 2979 Data size: 8937 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
@@ -1327,8 +1327,8 @@ STAGE PLANS:
value expressions: _col1 (type: string), 'bar' (type: string)
Reduce Operator Tree:
Select Operator
- expressions: KEY._col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col1 (type: string), VALUE._col2 (type: string), KEY._col3 (type: string), KEY._col4 (type: int)
- outputColumnNames: _col0, _col1, _col2, _col3, _col4
+ expressions: KEY._col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col1 (type: string), VALUE._col2 (type: string), KEY._col3 (type: string), KEY._col4 (type: int), KEY.'_bucket_number' (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, '_bucket_number'
Statistics: Num rows: 23 Data size: 2322 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
@@ -1407,8 +1407,8 @@ STAGE PLANS:
value expressions: _col1 (type: string), 'bar' (type: string)
Reduce Operator Tree:
Select Operator
- expressions: KEY._col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col1 (type: string), VALUE._col2 (type: string), KEY._col3 (type: string), KEY._col4 (type: int)
- outputColumnNames: _col0, _col1, _col2, _col3, _col4
+ expressions: KEY._col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col1 (type: string), VALUE._col2 (type: string), KEY._col3 (type: string), KEY._col4 (type: int), KEY.'_bucket_number' (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, '_bucket_number'
Statistics: Num rows: 45 Data size: 4550 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false