You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kg...@apache.org on 2018/08/27 09:54:04 UTC
[4/4] hive git commit: HIVE-20439: Use the inflated memory limit
during join selection for llap (Zoltan Haindrich reviewed by Ashutosh
Chauhan)
HIVE-20439: Use the inflated memory limit during join selection for llap (Zoltan Haindrich reviewed by Ashutosh Chauhan)
Signed-off-by: Zoltan Haindrich <ki...@rxd.hu>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fb7a676b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fb7a676b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fb7a676b
Branch: refs/heads/master
Commit: fb7a676b3f6baa3d156b3e7b3d1961a83bb7d698
Parents: f5e62eb
Author: Zoltan Haindrich <ki...@rxd.hu>
Authored: Mon Aug 27 11:04:07 2018 +0200
Committer: Zoltan Haindrich <ki...@rxd.hu>
Committed: Mon Aug 27 11:53:49 2018 +0200
----------------------------------------------------------------------
.../hive/ql/optimizer/ConvertJoinMapJoin.java | 45 +-
.../hadoop/hive/ql/exec/TestOperators.java | 17 +-
.../clientpositive/bucket_map_join_tez2.q | 4 +-
.../bucketsortoptimize_insert_6.q | 2 +-
.../queries/clientpositive/join32_lessSize.q | 2 +-
.../test/queries/clientpositive/tez_smb_main.q | 4 +-
.../queries/clientpositive/unionDistinct_1.q | 2 +-
.../results/clientpositive/llap/orc_llap.q.out | 59 +-
.../clientpositive/spark/join32_lessSize.q.out | 1159 +++++++++---------
9 files changed, 678 insertions(+), 616 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 4145baf..52855e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -87,6 +87,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ConvertJoinMapJoin.class.getName());
private float hashTableLoadFactor;
+ private long maxJoinMemory;
@Override
/*
@@ -103,15 +104,17 @@ public class ConvertJoinMapJoin implements NodeProcessor {
hashTableLoadFactor = context.conf.getFloatVar(ConfVars.HIVEHASHTABLELOADFACTOR);
JoinOperator joinOp = (JoinOperator) nd;
- long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
// adjust noconditional task size threshold for LLAP
LlapClusterStateForCompile llapInfo = null;
if ("llap".equalsIgnoreCase(context.conf.getVar(ConfVars.HIVE_EXECUTION_MODE))) {
llapInfo = LlapClusterStateForCompile.getClusterInfo(context.conf);
llapInfo.initClusterInfo();
}
- MemoryMonitorInfo memoryMonitorInfo = getMemoryMonitorInfo(maxSize, context.conf, llapInfo);
+ MemoryMonitorInfo memoryMonitorInfo = getMemoryMonitorInfo(context.conf, llapInfo);
joinOp.getConf().setMemoryMonitorInfo(memoryMonitorInfo);
+ maxJoinMemory = memoryMonitorInfo.getAdjustedNoConditionalTaskSize();
+
+ LOG.info("maxJoinMemory: {}", maxJoinMemory);
TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
boolean hiveConvertJoin = context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) &
@@ -119,11 +122,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
if (!hiveConvertJoin) {
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize);
+ Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
if (retval == null) {
return retval;
} else {
- fallbackToReduceSideJoin(joinOp, context, maxSize);
+ fallbackToReduceSideJoin(joinOp, context);
return null;
}
}
@@ -138,15 +141,15 @@ public class ConvertJoinMapJoin implements NodeProcessor {
numBuckets = 1;
}
LOG.info("Estimated number of buckets " + numBuckets);
- int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize, true);
+ int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxJoinMemory, true);
if (mapJoinConversionPos < 0) {
- Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize);
+ Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
if (retval == null) {
return retval;
} else {
// only case is full outer join with SMB enabled which is not possible. Convert to regular
// join.
- fallbackToReduceSideJoin(joinOp, context, maxSize);
+ fallbackToReduceSideJoin(joinOp, context);
return null;
}
}
@@ -167,12 +170,12 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// check if we can convert to map join no bucket scaling.
LOG.info("Convert to non-bucketed map join");
if (numBuckets != 1) {
- mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1, false, maxSize, true);
+ mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1, false, maxJoinMemory, true);
}
if (mapJoinConversionPos < 0) {
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- fallbackToReduceSideJoin(joinOp, context, maxSize);
+ fallbackToReduceSideJoin(joinOp, context);
return null;
}
@@ -238,8 +241,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
if (networkCostDPHJ < networkCostMJ) {
LOG.info("Dynamically partitioned Hash Join chosen");
- long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
- return convertJoinDynamicPartitionedHashJoin(joinOp, context, maxSize);
+ return convertJoinDynamicPartitionedHashJoin(joinOp, context);
} else if (numBuckets > 1) {
LOG.info("Bucket Map Join chosen");
return convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx);
@@ -271,9 +273,10 @@ public class ConvertJoinMapJoin implements NodeProcessor {
}
@VisibleForTesting
- public MemoryMonitorInfo getMemoryMonitorInfo(final long maxSize,
+ public MemoryMonitorInfo getMemoryMonitorInfo(
final HiveConf conf,
LlapClusterStateForCompile llapInfo) {
+ long maxSize = conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR);
final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY);
final long memoryCheckInterval = conf.getLongVar(ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
@@ -315,13 +318,13 @@ public class ConvertJoinMapJoin implements NodeProcessor {
@SuppressWarnings("unchecked")
private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp,
- TezBucketJoinProcCtx tezBucketJoinProcCtx, final long maxSize) throws SemanticException {
+ TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
// we cannot convert to bucket map join, we cannot convert to
// map join either based on the size. Check if we can convert to SMB join.
if (!(HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN))
|| ((!HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN_REDUCE))
&& joinOp.getOpTraits().getNumReduceSinks() >= 2)) {
- fallbackToReduceSideJoin(joinOp, context, maxSize);
+ fallbackToReduceSideJoin(joinOp, context);
return null;
}
Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
@@ -350,7 +353,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// contains aliases from sub-query
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- fallbackToReduceSideJoin(joinOp, context, maxSize);
+ fallbackToReduceSideJoin(joinOp, context);
return null;
}
@@ -360,7 +363,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
} else {
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- fallbackToReduceSideJoin(joinOp, context, maxSize);
+ fallbackToReduceSideJoin(joinOp, context);
}
return null;
}
@@ -893,6 +896,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
}
long inputSize = computeOnlineDataSize(currInputStat);
+ LOG.info("Join input#{}; onlineDataSize: {}; Statistics: {}", pos, inputSize, currInputStat);
boolean currentInputNotFittingInMemory = false;
if ((bigInputStat == null)
@@ -1271,14 +1275,13 @@ public class ConvertJoinMapJoin implements NodeProcessor {
return numBuckets;
}
- private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context,
- final long maxSize)
+ private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context)
throws SemanticException {
// Attempt dynamic partitioned hash join
// Since we don't have big table index yet, must start with estimate of numReducers
int numReducers = estimateNumBuckets(joinOp, false);
LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers");
- int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, maxSize,false);
+ int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, maxJoinMemory, false);
if (bigTablePos >= 0) {
// Now that we have the big table index, get real numReducers value based on big table RS
ReduceSinkOperator bigTableParentRS =
@@ -1314,11 +1317,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
return false;
}
- private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context, final long maxSize)
+ private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context)
throws SemanticException {
if (context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) &&
context.conf.getBoolVar(HiveConf.ConfVars.HIVEDYNAMICPARTITIONHASHJOIN)) {
- if (convertJoinDynamicPartitionedHashJoin(joinOp, context, maxSize)) {
+ if (convertJoinDynamicPartitionedHashJoin(joinOp, context)) {
return;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
index bbc2453..fe64bf5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
@@ -26,8 +26,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import junit.framework.TestCase;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -66,6 +64,8 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.junit.Assert;
import org.junit.Test;
+import junit.framework.TestCase;
+
/**
* TestOperators.
*
@@ -442,6 +442,7 @@ public class TestOperators extends TestCase {
ConvertJoinMapJoin convertJoinMapJoin = new ConvertJoinMapJoin();
long defaultNoConditionalTaskSize = 1024L * 1024L * 1024L;
HiveConf hiveConf = new HiveConf();
+ hiveConf.setLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD, defaultNoConditionalTaskSize);
LlapClusterStateForCompile llapInfo = null;
if ("llap".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_MODE))) {
@@ -449,8 +450,8 @@ public class TestOperators extends TestCase {
llapInfo.initClusterInfo();
}
// execution mode not set, null is returned
- assertEquals(defaultNoConditionalTaskSize, convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize,
- hiveConf, llapInfo).getAdjustedNoConditionalTaskSize());
+ assertEquals(defaultNoConditionalTaskSize,
+ convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).getAdjustedNoConditionalTaskSize());
hiveConf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "llap");
if ("llap".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_MODE))) {
@@ -464,7 +465,7 @@ public class TestOperators extends TestCase {
int maxSlots = 3;
long expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * maxSlots));
assertEquals(expectedSize,
- convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo)
+ convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo)
.getAdjustedNoConditionalTaskSize());
// num executors is less than max executors per query (which is not expected case), default executors will be
@@ -473,18 +474,18 @@ public class TestOperators extends TestCase {
hiveConf.set(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname, "5");
expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * chosenSlots));
assertEquals(expectedSize,
- convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo)
+ convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo)
.getAdjustedNoConditionalTaskSize());
// disable memory checking
hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL.varname, "0");
assertFalse(
- convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo).doMemoryMonitoring());
+ convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).doMemoryMonitoring());
// invalid inflation factor
hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL.varname, "10000");
hiveConf.set(HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR.varname, "0.0f");
assertFalse(
- convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo).doMemoryMonitoring());
+ convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).doMemoryMonitoring());
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
index ae1ec44..85f2f2b 100644
--- a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
+++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
@@ -47,7 +47,7 @@ select key,value from srcbucket_mapjoin_n18;
analyze table tab1_n5 compute statistics for columns;
-- A negative test as src is not bucketed.
-set hive.auto.convert.join.noconditionaltask.size=20000;
+set hive.auto.convert.join.noconditionaltask.size=12000;
set hive.convert.join.bucket.mapjoin.tez = false;
explain
select a.key, a.value, b.value
@@ -98,7 +98,7 @@ insert overwrite table tab_part1 partition (ds='2008-04-08')
select key,value from srcbucket_mapjoin_part_n20;
analyze table tab_part1 compute statistics for columns;
-set hive.auto.convert.join.noconditionaltask.size=20000;
+set hive.auto.convert.join.noconditionaltask.size=12000;
set hive.convert.join.bucket.mapjoin.tez = false;
explain
select count(*)
http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q b/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q
index cd0a234..2d4907c 100644
--- a/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q
+++ b/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q
@@ -30,7 +30,7 @@ INSERT OVERWRITE TABLE test_table2_n3 PARTITION (ds = '1') SELECT key, key+1, va
-- Insert data into the bucketed table by selecting from another bucketed table
-- This should be a map-only operation, since the sort-order matches
-set hive.auto.convert.join.noconditionaltask.size=800;
+set hive.auto.convert.join.noconditionaltask.size=400;
EXPLAIN
INSERT OVERWRITE TABLE test_table3_n3 PARTITION (ds = '1')
SELECT a.key, a.key2, concat(a.value, b.value)
http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/join32_lessSize.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/join32_lessSize.q b/ql/src/test/queries/clientpositive/join32_lessSize.q
index 229ba56..fcadbe3 100644
--- a/ql/src/test/queries/clientpositive/join32_lessSize.q
+++ b/ql/src/test/queries/clientpositive/join32_lessSize.q
@@ -9,7 +9,7 @@ CREATE TABLE dest_j2_n1(key STRING, value STRING, val2 STRING) STORED AS TEXTFIL
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
-set hive.auto.convert.join.noconditionaltask.size=6000;
+set hive.auto.convert.join.noconditionaltask.size=4000;
-- Since the inputs are small, it should be automatically converted to mapjoin
http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/tez_smb_main.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_smb_main.q b/ql/src/test/queries/clientpositive/tez_smb_main.q
index db8daa3..c7516b8 100644
--- a/ql/src/test/queries/clientpositive/tez_smb_main.q
+++ b/ql/src/test/queries/clientpositive/tez_smb_main.q
@@ -70,7 +70,7 @@ select count(*)
from tab_n11 a join tab_part_n12 b on a.key = b.key;
-set hive.auto.convert.join.noconditionaltask.size=2000;
+set hive.auto.convert.join.noconditionaltask.size=1000;
set hive.mapjoin.hybridgrace.minwbsize=125;
set hive.mapjoin.hybridgrace.minnumpartitions=4;
set hive.llap.memory.oversubscription.max.executors.per.query=0;
@@ -111,7 +111,7 @@ UNION ALL
select s2.key as key, s2.value as value from tab_n11 s2
) a join tab_part_n12 b on (a.key = b.key);
-set hive.auto.convert.join.noconditionaltask.size=10000;
+set hive.auto.convert.join.noconditionaltask.size=5000;
explain
select count(*) from
http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/unionDistinct_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/unionDistinct_1.q b/ql/src/test/queries/clientpositive/unionDistinct_1.q
index f966f42..75c66b0 100644
--- a/ql/src/test/queries/clientpositive/unionDistinct_1.q
+++ b/ql/src/test/queries/clientpositive/unionDistinct_1.q
@@ -158,7 +158,7 @@ set hive.merge.mapfiles=false;
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
-set hive.auto.convert.join.noconditionaltask.size=15000;
+set hive.auto.convert.join.noconditionaltask.size=8000;
-- Since the inputs are small, it should be automatically converted to mapjoin
http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/orc_llap.q.out b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
index a639b68..f4f8278 100644
--- a/ql/src/test/results/clientpositive/llap/orc_llap.q.out
+++ b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
@@ -1021,8 +1021,8 @@ STAGE PLANS:
Tez
#### A masked pattern was here ####
Edges:
- Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
- Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+ Map 2 <- Map 1 (BROADCAST_EDGE)
+ Reducer 3 <- Map 2 (CUSTOM_SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -1046,7 +1046,7 @@ STAGE PLANS:
value expressions: _col2 (type: string)
Execution mode: vectorized, llap
LLAP IO: all inputs
- Map 4
+ Map 2
Map Operator Tree:
TableScan
alias: o2
@@ -1059,38 +1059,31 @@ STAGE PLANS:
expressions: csmallint (type: smallint), cstring2 (type: string)
outputColumnNames: _col0, _col2
Statistics: Num rows: 136968 Data size: 11042828 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: smallint)
- sort order: +
- Map-reduce partition columns: _col0 (type: smallint)
- Statistics: Num rows: 136968 Data size: 11042828 Basic stats: COMPLETE Column stats: COMPLETE
- value expressions: _col2 (type: string)
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: smallint)
+ 1 _col0 (type: smallint)
+ outputColumnNames: _col2, _col5
+ input vertices:
+ 0 Map 1
+ Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: hash(_col2,_col5) (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE
+ Group By Operator
+ aggregations: sum(_col0)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+ value expressions: _col0 (type: bigint)
Execution mode: vectorized, llap
LLAP IO: all inputs
- Reducer 2
- Execution mode: llap
- Reduce Operator Tree:
- Merge Join Operator
- condition map:
- Inner Join 0 to 1
- keys:
- 0 _col0 (type: smallint)
- 1 _col0 (type: smallint)
- outputColumnNames: _col2, _col5
- Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE
- Select Operator
- expressions: hash(_col2,_col5) (type: int)
- outputColumnNames: _col0
- Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: sum(_col0)
- mode: hash
- outputColumnNames: _col0
- Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- sort order:
- Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
- value expressions: _col0 (type: bigint)
Reducer 3
Execution mode: vectorized, llap
Reduce Operator Tree: