You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kg...@apache.org on 2018/03/07 08:19:20 UTC
[03/22] hive git commit: HIVE-17626: Query reoptimization using
cached runtime statistics (Zoltan Haindrich reviewed by Ashutosh Chauhan)
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
index 9a3f81c..22b052c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
@@ -27,11 +27,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.Stack;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
@@ -74,6 +75,8 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.Statistics;
import org.apache.hadoop.hive.ql.plan.Statistics.State;
+import org.apache.hadoop.hive.ql.plan.mapper.RuntimeStatsSource;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
@@ -132,7 +135,9 @@ public class StatsRulesProcFactory {
try {
// gather statistics for the first time and the attach it to table scan operator
Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, colStatsCached, table, tsop);
- tsop.setStatistics(stats.clone());
+
+ stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, (Operator<?>) tsop);
+ tsop.setStatistics(stats);
if (LOG.isDebugEnabled()) {
LOG.debug("[0] STATS-" + tsop.toString() + " (" + table.getTableName() + "): " +
@@ -144,6 +149,7 @@ public class StatsRulesProcFactory {
}
return null;
}
+
}
/**
@@ -181,14 +187,15 @@ public class StatsRulesProcFactory {
if (satisfyPrecondition(parentStats)) {
// this will take care of mapping between input column names and output column names. The
// returned column stats will have the output column names.
- List<ColStatistics> colStats = StatsUtils.getColStatisticsFromExprMap(conf, parentStats,
- sop.getColumnExprMap(), sop.getSchema());
+ List<ColStatistics> colStats =
+ StatsUtils.getColStatisticsFromExprMap(conf, parentStats, sop.getColumnExprMap(), sop.getSchema());
stats.setColumnStats(colStats);
// in case of select(*) the data size does not change
if (!sop.getConf().isSelectStar() && !sop.getConf().isSelStarNoCompute()) {
long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats);
stats.setDataSize(dataSize);
}
+ stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, (Operator<?>) sop);
sop.setStatistics(stats);
if (LOG.isDebugEnabled()) {
@@ -196,7 +203,8 @@ public class StatsRulesProcFactory {
}
} else {
if (parentStats != null) {
- sop.setStatistics(parentStats.clone());
+ stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, (Operator<?>) sop);
+ sop.setStatistics(stats);
if (LOG.isDebugEnabled()) {
LOG.debug("[1] STATS-" + sop.toString() + ": " + parentStats.extendedToString());
@@ -299,7 +307,10 @@ public class StatsRulesProcFactory {
LOG.debug("[1] STATS-" + fop.toString() + ": " + st.extendedToString());
}
}
+
+ st = applyRuntimeStats(aspCtx.getParseContext().getContext(), st, (Operator<?>) fop);
fop.setStatistics(st);
+
aspCtx.setAndExprStats(null);
}
return null;
@@ -1249,6 +1260,7 @@ public class StatsRulesProcFactory {
}
}
+ stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, (Operator<?>) gop);
gop.setStatistics(stats);
if (LOG.isDebugEnabled() && stats != null) {
@@ -1576,6 +1588,7 @@ public class StatsRulesProcFactory {
}
}
+ stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, jop);
jop.setStatistics(stats);
if (LOG.isDebugEnabled()) {
@@ -1665,6 +1678,7 @@ public class StatsRulesProcFactory {
}
}
+ wcStats = applyRuntimeStats(aspCtx.getParseContext().getContext(), wcStats, jop);
jop.setStatistics(wcStats);
if (LOG.isDebugEnabled()) {
@@ -2215,6 +2229,7 @@ public class StatsRulesProcFactory {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
+ AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
LimitOperator lop = (LimitOperator) nd;
Operator<? extends OperatorDesc> parent = lop.getParentOperators().get(0);
Statistics parentStats = parent.getStatistics();
@@ -2232,6 +2247,7 @@ public class StatsRulesProcFactory {
if (limit <= parentStats.getNumRows()) {
updateStats(stats, limit, true, lop);
}
+ stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, (Operator<?>) lop);
lop.setStatistics(stats);
if (LOG.isDebugEnabled()) {
@@ -2243,7 +2259,8 @@ public class StatsRulesProcFactory {
// in the absence of column statistics, compute data size based on
// based on average row size
limit = StatsUtils.getMaxIfOverflow(limit);
- Statistics wcStats = parentStats.scaleToRowCount(limit);
+ Statistics wcStats = parentStats.scaleToRowCount(limit, true);
+ wcStats = applyRuntimeStats(aspCtx.getParseContext().getContext(), wcStats, (Operator<?>) lop);
lop.setStatistics(wcStats);
if (LOG.isDebugEnabled()) {
LOG.debug("[1] STATS-" + lop.toString() + ": " + wcStats.extendedToString());
@@ -2265,8 +2282,7 @@ public class StatsRulesProcFactory {
public static class ReduceSinkStatsRule extends DefaultStatsRule implements NodeProcessor {
@Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
ReduceSinkOperator rop = (ReduceSinkOperator) nd;
Operator<? extends OperatorDesc> parent = rop.getParentOperators().get(0);
Statistics parentStats = parent.getStatistics();
@@ -2283,8 +2299,7 @@ public class StatsRulesProcFactory {
String prefixedKey = Utilities.ReduceField.KEY.toString() + "." + key;
ExprNodeDesc end = colExprMap.get(prefixedKey);
if (end != null) {
- ColStatistics cs = StatsUtils
- .getColStatisticsFromExpression(conf, parentStats, end);
+ ColStatistics cs = StatsUtils.getColStatisticsFromExpression(conf, parentStats, end);
if (cs != null) {
cs.setColumnName(prefixedKey);
colStats.add(cs);
@@ -2296,8 +2311,7 @@ public class StatsRulesProcFactory {
String prefixedVal = Utilities.ReduceField.VALUE.toString() + "." + val;
ExprNodeDesc end = colExprMap.get(prefixedVal);
if (end != null) {
- ColStatistics cs = StatsUtils
- .getColStatisticsFromExpression(conf, parentStats, end);
+ ColStatistics cs = StatsUtils.getColStatisticsFromExpression(conf, parentStats, end);
if (cs != null) {
cs.setColumnName(prefixedVal);
colStats.add(cs);
@@ -2307,6 +2321,8 @@ public class StatsRulesProcFactory {
outStats.setColumnStats(colStats);
}
+
+ outStats = applyRuntimeStats(aspCtx.getParseContext().getContext(), outStats, (Operator<?>) rop);
rop.setStatistics(outStats);
if (LOG.isDebugEnabled()) {
LOG.debug("[0] STATS-" + rop.toString() + ": " + outStats.extendedToString());
@@ -2355,6 +2371,7 @@ public class StatsRulesProcFactory {
LOG.debug("[0] STATS-" + op.toString() + ": " + stats.extendedToString());
}
}
+ stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, op);
op.getConf().setStatistics(stats);
}
}
@@ -2473,4 +2490,24 @@ public class StatsRulesProcFactory {
return stats != null && stats.getBasicStatsState().equals(Statistics.State.COMPLETE)
&& !stats.getColumnStatsState().equals(Statistics.State.NONE);
}
+
+
+ private static Statistics applyRuntimeStats(Context context, Statistics stats, Operator<?> op) {
+ if (!context.getRuntimeStatsSource().isPresent()) {
+ return stats;
+ }
+ RuntimeStatsSource rss = context.getRuntimeStatsSource().get();
+
+ Optional<OperatorStats> os = rss.lookup(op);
+
+ if (!os.isPresent()) {
+ return stats;
+ }
+ LOG.debug("using runtime stats for {}; {}", op, os.get());
+ Statistics outStats = stats.clone();
+ outStats = outStats.scaleToRowCount(os.get().getOutputRecords(), false);
+ outStats.setRuntimeStats(true);
+ return outStats;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
index 78cbf25..a1ec96c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
@@ -303,6 +303,7 @@ KW_COMPACTIONS: 'COMPACTIONS';
KW_TRANSACTIONS: 'TRANSACTIONS';
KW_REWRITE : 'REWRITE';
KW_AUTHORIZATION: 'AUTHORIZATION';
+KW_REOPTIMIZATION: 'REOPTIMIZATION';
KW_CONF: 'CONF';
KW_VALUES: 'VALUES';
KW_RELOAD: 'RELOAD';
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index 0c6aece..3abc752 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -774,14 +774,21 @@ explainStatement
: KW_EXPLAIN (
explainOption* execStatement -> ^(TOK_EXPLAIN execStatement explainOption*)
|
- KW_REWRITE queryStatementExpression -> ^(TOK_EXPLAIN_SQ_REWRITE queryStatementExpression))
+ KW_REWRITE queryStatementExpression -> ^(TOK_EXPLAIN_SQ_REWRITE queryStatementExpression)
+ )
;
explainOption
@init { msgs.push("explain option"); }
@after { msgs.pop(); }
- : KW_EXTENDED|KW_FORMATTED|KW_DEPENDENCY|KW_LOGICAL|KW_AUTHORIZATION|KW_ANALYZE|
- (KW_VECTORIZATION vectorizationOnly? vectorizatonDetail?)
+ : KW_EXTENDED
+ | KW_FORMATTED
+ | KW_DEPENDENCY
+ | KW_LOGICAL
+ | KW_AUTHORIZATION
+ | KW_ANALYZE
+ | KW_REOPTIMIZATION
+ | (KW_VECTORIZATION vectorizationOnly? vectorizatonDetail?)
;
vectorizationOnly
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
index 35f9edf..2bba33f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
@@ -832,6 +832,7 @@ nonReserved
| KW_ZONE
| KW_TIMESTAMPTZ
| KW_DEFAULT
+ | KW_REOPTIMIZATION
| KW_RESOURCE | KW_PLAN | KW_PLANS | KW_QUERY_PARALLELISM | KW_ACTIVATE | KW_MOVE | KW_DO
| KW_POOL | KW_ALLOC_FRACTION | KW_SCHEDULING_POLICY | KW_PATH | KW_MAPPING | KW_WORKLOAD | KW_MANAGEMENT | KW_ACTIVE | KW_UNMANAGED
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
index 714cf39..e04a783 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
@@ -22,12 +22,9 @@ package org.apache.hadoop.hive.ql.plan;
import java.util.HashMap;
import java.util.Map;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.PTFUtils;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
-import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
-public class AbstractOperatorDesc implements OperatorDesc {
+public abstract class AbstractOperatorDesc implements OperatorDesc {
protected boolean vectorMode = false;
@@ -125,10 +122,12 @@ public class AbstractOperatorDesc implements OperatorDesc {
this.memAvailable = memoryAvailble;
}
+ @Override
public String getRuntimeStatsTmpDir() {
return runtimeStatsTmpDir;
}
+ @Override
public void setRuntimeStatsTmpDir(String runtimeStatsTmpDir) {
this.runtimeStatsTmpDir = runtimeStatsTmpDir;
}
@@ -161,4 +160,9 @@ public class AbstractOperatorDesc implements OperatorDesc {
this.colExprMap = colExprMap;
}
+ @Override
+ public void fillSignature(Map<String, Object> ret) {
+ throw new RuntimeException();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java
index 7d5be6b..a68371a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java
@@ -19,14 +19,15 @@
package org.apache.hadoop.hive.ql.plan;
import java.io.IOException;
-import java.util.List;
import java.util.Objects;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
import org.apache.hadoop.io.DataOutputBuffer;
+
@SuppressWarnings("serial")
@Explain(displayName = "Application Master Event Operator")
public class AppMasterEventDesc extends AbstractOperatorDesc {
@@ -36,11 +37,13 @@ public class AppMasterEventDesc extends AbstractOperatorDesc {
private String inputName;
@Explain(displayName = "Target Vertex")
+ @Signature
public String getVertexName() {
return vertexName;
}
@Explain(displayName = "Target Input")
+ @Signature
public String getInputName() {
return inputName;
}
@@ -53,6 +56,7 @@ public class AppMasterEventDesc extends AbstractOperatorDesc {
this.vertexName = vertexName;
}
+ @Signature
public TableDesc getTable() {
return table;
}
@@ -98,4 +102,5 @@ public class AppMasterEventDesc extends AbstractOperatorDesc {
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
index 7332693..5a81add 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
+
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
@@ -38,6 +40,7 @@ public class CommonMergeJoinDesc extends MapJoinDesc implements Serializable {
this.mapJoinConversionPos = mapJoinConversionPos;
}
+ @Signature
public int getNumBuckets() {
return numBuckets;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java
index 5d3fdb8..32c6c6f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java
@@ -23,6 +23,7 @@ import java.util.Objects;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
@@ -67,6 +68,7 @@ public class DynamicPruningEventDesc extends AppMasterEventDesc {
return targetColumnName + " (" + targetColumnType + ")";
}
+ @Signature
public String getTargetColumnName() {
return targetColumnName;
}
@@ -75,6 +77,7 @@ public class DynamicPruningEventDesc extends AppMasterEventDesc {
this.targetColumnName = columnName;
}
+ @Signature
public String getTargetColumnType() {
return targetColumnType;
}
@@ -94,6 +97,7 @@ public class DynamicPruningEventDesc extends AppMasterEventDesc {
}
@Explain(displayName = "Partition key expr")
+ @Signature
public String getPartKeyString() {
return this.partKey.getExprString();
}
@@ -112,4 +116,5 @@ public class DynamicPruningEventDesc extends AppMasterEventDesc {
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index ce61fc5..e15a49f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -25,6 +25,7 @@ import java.util.Objects;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
@@ -191,6 +192,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
}
@Explain(displayName = "directory", explainLevels = { Level.EXTENDED })
+ @Signature
public Path getDirName() {
return dirName;
}
@@ -214,6 +216,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
}
@Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ @Signature
public TableDesc getTableInfo() {
return tableInfo;
}
@@ -223,6 +226,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
}
@Explain(displayName = "compressed")
+ @Signature
public boolean getCompressed() {
return compressed;
}
@@ -232,6 +236,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
}
@Explain(displayName = "GlobalTableId", explainLevels = { Level.EXTENDED })
+ @Signature
+
public int getDestTableId() {
return destTableId;
}
@@ -260,6 +266,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
* @return the multiFileSpray
*/
@Explain(displayName = "MultiFileSpray", explainLevels = { Level.EXTENDED })
+ @Signature
+
public boolean isMultiFileSpray() {
return multiFileSpray;
}
@@ -311,6 +319,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
* @return the totalFiles
*/
@Explain(displayName = "TotalFiles", explainLevels = { Level.EXTENDED })
+ @Signature
+
public int getTotalFiles() {
return totalFiles;
}
@@ -340,6 +350,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
* @return the numFiles
*/
@Explain(displayName = "NumFilesPerFileSink", explainLevels = { Level.EXTENDED })
+ @Signature
+
public int getNumFiles() {
return numFiles;
}
@@ -364,6 +376,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
}
@Explain(displayName = "Static Partition Specification", explainLevels = { Level.EXTENDED })
+ @Signature
public String getStaticSpec() {
return staticSpec;
}
@@ -374,6 +387,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
@Override
@Explain(displayName = "GatherStats", explainLevels = { Level.EXTENDED })
+ @Signature
+
public boolean isGatherStats() {
return gatherStats;
}
@@ -391,6 +406,9 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
*/
@Override
@Explain(displayName = "Stats Publishing Key Prefix", explainLevels = { Level.EXTENDED })
+ // FIXME: including this in the signature will almost certenly differ even if the operator is doing the same
+ // there might be conflicting usages of logicalCompare?
+ @Signature
public String getStatsAggPrefix() {
// dirName uniquely identifies destination directory of a FileSinkOperator.
// If more than one FileSinkOperator write to the same partition, this dirName
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
index d59834c..fc7327a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Objects;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
@@ -109,6 +110,7 @@ public class FilterDesc extends AbstractOperatorDesc {
this.sampleDescr = sampleDescr;
}
+ @Signature
public String getPredicateString() {
return PlanUtils.getExprListString(Arrays.asList(predicate));
}
@@ -137,6 +139,7 @@ public class FilterDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "isSamplingPred", explainLevels = { Level.EXTENDED })
+ @Signature
public boolean getIsSamplingPred() {
return isSamplingPred;
}
@@ -154,6 +157,7 @@ public class FilterDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "sampleDesc", explainLevels = { Level.EXTENDED })
+ @Signature
public String getSampleDescExpr() {
return sampleDescr == null ? null : sampleDescr.toString();
}
@@ -234,4 +238,5 @@ public class FilterDesc extends AbstractOperatorDesc {
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
index 86cc77d..31237c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
@@ -25,16 +25,12 @@ import java.util.Objects;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationDesc;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hive.common.util.AnnotationUtils;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
/**
@@ -129,6 +125,7 @@ public class GroupByDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "mode")
+ @Signature
public String getModeString() {
switch (mode) {
case COMPLETE:
@@ -155,6 +152,7 @@ public class GroupByDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "keys")
+ @Signature
public String getKeyString() {
return PlanUtils.getExprListString(keys);
}
@@ -173,6 +171,7 @@ public class GroupByDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "outputColumnNames")
+ @Signature
public ArrayList<java.lang.String> getOutputColumnNames() {
return outputColumnNames;
}
@@ -183,6 +182,7 @@ public class GroupByDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "pruneGroupingSetId", displayOnlyOnTrue = true)
+ @Signature
public boolean pruneGroupingSetId() {
return groupingSetPosition >= 0 &&
outputColumnNames.size() != keys.size() + aggregators.size();
@@ -210,6 +210,7 @@ public class GroupByDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "aggregations", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ @Signature
public List<String> getAggregatorStrings() {
List<String> res = new ArrayList<String>();
for (AggregationDesc agg: aggregators) {
@@ -235,6 +236,7 @@ public class GroupByDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "bucketGroup", displayOnlyOnTrue = true)
+ @Signature
public boolean getBucketGroup() {
return bucketGroup;
}
@@ -424,4 +426,5 @@ public class GroupByDesc extends AbstractOperatorDesc {
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
index 9c651ab..a61a47e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Objects;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
@@ -289,6 +290,7 @@ public class HashTableSinkDesc extends JoinDesc implements Serializable {
@Override
@Explain(displayName = "filter mappings", explainLevels = { Level.EXTENDED })
+ @Signature
public Map<Integer, String> getFilterMapString() {
return toCompactString(filterMap);
}
@@ -304,6 +306,7 @@ public class HashTableSinkDesc extends JoinDesc implements Serializable {
/**
* @return the keys in string form
*/
+ @Override
@Explain(displayName = "keys")
public Map<Byte, String> getKeysString() {
Map<Byte, String> keyMap = new LinkedHashMap<Byte, String>();
@@ -313,6 +316,7 @@ public class HashTableSinkDesc extends JoinDesc implements Serializable {
return keyMap;
}
+ @Override
@Explain(displayName = "keys", explainLevels = { Level.USER })
public Map<Byte, String> getUserLevelExplainKeysString() {
Map<Byte, String> keyMap = new LinkedHashMap<Byte, String>();
@@ -399,4 +403,5 @@ public class HashTableSinkDesc extends JoinDesc implements Serializable {
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java
index 6dcf05a..ea22131 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
/**
* Join conditions Descriptor implementation.
- *
+ *
*/
public class JoinCondDesc implements Serializable {
private static final long serialVersionUID = 1L;
@@ -153,7 +153,7 @@ public class JoinCondDesc implements Serializable {
@Explain(explainLevels = { Level.USER })
public String getUserLevelJoinCondString() {
- JSONObject join = new JSONObject(new LinkedHashMap());
+ JSONObject join = new JSONObject(new LinkedHashMap<>());
try {
switch (type) {
case JoinDesc.INNER_JOIN:
@@ -200,4 +200,6 @@ public class JoinCondDesc implements Serializable {
}
return true;
}
+
+ // XXX: is hashCode missing here?
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
index bd45c75..5b7f4c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
@@ -30,6 +30,7 @@ import java.util.Objects;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
@@ -229,6 +230,7 @@ public class JoinDesc extends AbstractOperatorDesc {
* @return the keys in string form
*/
@Explain(displayName = "keys")
+ @Signature
public Map<Byte, String> getKeysString() {
if (joinKeys == null) {
return null;
@@ -266,6 +268,7 @@ public class JoinDesc extends AbstractOperatorDesc {
* @return Map from alias to filters on the alias.
*/
@Explain(displayName = "filter predicates")
+ @Signature
public Map<Byte, String> getFiltersStringMap() {
if (getFilters() == null || getFilters().size() == 0) {
return null;
@@ -342,6 +345,7 @@ public class JoinDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "outputColumnNames")
+ @Signature
public List<String> getOutputColumnNames() {
return outputColumnNames;
}
@@ -365,6 +369,7 @@ public class JoinDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "condition map", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ @Signature
public List<JoinCondDesc> getCondsList() {
if (conds == null) {
return null;
@@ -425,6 +430,7 @@ public class JoinDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "handleSkewJoin", displayOnlyOnTrue = true)
+ @Signature
public boolean getHandleSkewJoin() {
return handleSkewJoin;
}
@@ -524,6 +530,7 @@ public class JoinDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "nullSafes")
+ @Signature
public String getNullSafeString() {
if (nullsafes == null) {
return null;
@@ -740,4 +747,5 @@ public class JoinDesc extends AbstractOperatorDesc {
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java
index 3837a49..85a4683 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.plan;
import java.util.ArrayList;
import java.util.Objects;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
@@ -49,6 +50,7 @@ public class LateralViewJoinDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "outputColumnNames")
+ @Signature
public ArrayList<String> getOutputInternalColNames() {
return outputInternalColNames;
}
@@ -74,4 +76,5 @@ public class LateralViewJoinDesc extends AbstractOperatorDesc {
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java
index ce53fea..698af94 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.plan;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
@@ -58,6 +59,7 @@ public class LimitDesc extends AbstractOperatorDesc {
this.offset = offset;
}
+ @Signature
@Explain(displayName = "Number of rows", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public int getLimit() {
return limit;
@@ -100,4 +102,5 @@ public class LimitDesc extends AbstractOperatorDesc {
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
index cf4ab60..91ea159 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
@@ -33,6 +33,7 @@ import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
@@ -139,6 +140,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
}
@Explain(displayName = "input vertices", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ @Signature
public Map<Integer, String> getParentToInput() {
return parentToInput;
}
@@ -156,6 +158,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
}
@Explain(displayName = "Estimated key counts", explainLevels = { Level.EXTENDED })
+ @Signature
public String getKeyCountsExplainDesc() {
StringBuilder result = null;
for (Map.Entry<Integer, Long> entry : parentKeyCounts.entrySet()) {
@@ -250,6 +253,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
* @return the position of the big table not in memory
*/
@Explain(displayName = "Position of Big Table", explainLevels = { Level.EXTENDED })
+ @Signature
public int getPosBigTable() {
return posBigTable;
}
@@ -340,6 +344,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
}
@Explain(displayName = "BucketMapJoin", explainLevels = { Level.USER, Level.EXTENDED }, displayOnlyOnTrue = true)
+ @Signature
public boolean isBucketMapJoin() {
return isBucketMapJoin;
}
@@ -607,4 +612,5 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
index 870b4d9..e8a5827 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
@@ -34,7 +34,10 @@ public interface OperatorDesc extends Serializable, Cloneable {
public void setMaxMemoryAvailable(long memoryAvailble);
public String getRuntimeStatsTmpDir();
public void setRuntimeStatsTmpDir(String runtimeStatsTmpDir);
+
boolean isSame(OperatorDesc other);
public Map<String, ExprNodeDesc> getColumnExprMap();
public void setColumnExprMap(Map<String, ExprNodeDesc> colExprMap);
+
+ void fillSignature(Map<String, Object> ret);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
index bf24ff8..f2955af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
@@ -27,6 +27,7 @@ import java.util.Objects;
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc.ReduceSinkKeyType;
@@ -97,7 +98,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
private float topNMemoryUsage = -1;
private boolean mapGroupBy; // for group-by, values with same key on top-K should be forwarded
//flag used to control how TopN handled for PTF/Windowing partitions.
- private boolean isPTFReduceSink = false;
+ private boolean isPTFReduceSink = false;
private boolean skipTag; // Skip writing tags when feeding into mapjoin hashtable
private boolean forwarding; // Whether this RS can forward records directly instead of shuffling/sorting
@@ -206,6 +207,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
return PlanUtils.getExprListString(keyCols);
}
+ @Signature
public java.util.ArrayList<ExprNodeDesc> getKeyCols() {
return keyCols;
}
@@ -227,6 +229,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
return PlanUtils.getExprListString(valueCols);
}
+ @Signature
public java.util.ArrayList<ExprNodeDesc> getValueCols() {
return valueCols;
}
@@ -245,6 +248,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
return PlanUtils.getExprListString(partitionCols, true);
}
+ @Signature
public java.util.ArrayList<ExprNodeDesc> getPartitionCols() {
return partitionCols;
}
@@ -261,6 +265,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
return false;
}
+ @Signature
@Explain(displayName = "tag", explainLevels = { Level.EXTENDED })
public int getTag() {
return tag;
@@ -270,6 +275,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
this.tag = tag;
}
+ @Signature
public int getTopN() {
return topN;
}
@@ -349,6 +355,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
* of the same length as key columns, that consists of only "+"
* (ascending order) and "-" (descending order).
*/
+ @Signature
@Explain(displayName = "sort order")
public String getOrder() {
return keySerializeInfo.getProperties().getProperty(
@@ -437,6 +444,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
return forwarding;
}
+ @Signature
@Explain(displayName = "auto parallelism", explainLevels = { Level.EXTENDED })
public final boolean isAutoParallel() {
return (this.reduceTraits.contains(ReducerTraits.AUTOPARALLEL));
@@ -462,7 +470,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
// reducers or hash function.
boolean wasUnset = this.reduceTraits.remove(ReducerTraits.UNSET);
-
+
if (this.reduceTraits.contains(ReducerTraits.FIXED)) {
return;
} else if (traits.contains(ReducerTraits.FIXED)) {
@@ -661,4 +669,5 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java
index 858de98..53fca99 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java
@@ -18,12 +18,13 @@
package org.apache.hadoop.hive.ql.plan;
+import java.util.Objects;
+
import org.apache.hadoop.hive.ql.exec.RecordReader;
import org.apache.hadoop.hive.ql.exec.RecordWriter;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
-import java.util.Objects;
-
/**
* ScriptDesc.
@@ -63,6 +64,7 @@ public class ScriptDesc extends AbstractOperatorDesc {
this.scriptErrInfo = scriptErrInfo;
}
+ @Signature
@Explain(displayName = "command", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public String getScriptCmd() {
return scriptCmd;
@@ -72,6 +74,7 @@ public class ScriptDesc extends AbstractOperatorDesc {
this.scriptCmd = scriptCmd;
}
+ @Signature
@Explain(displayName = "output info", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public TableDesc getScriptOutputInfo() {
return scriptOutputInfo;
@@ -154,4 +157,5 @@ public class ScriptDesc extends AbstractOperatorDesc {
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java
index e38e7e4..51b94fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Objects;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
@@ -72,6 +73,7 @@ public class SelectDesc extends AbstractOperatorDesc {
return ret;
}
+ @Signature
@Explain(displayName = "expressions")
public String getColListString() {
return PlanUtils.getExprListString(colList);
@@ -86,6 +88,7 @@ public class SelectDesc extends AbstractOperatorDesc {
this.colList = colList;
}
+ @Signature
@Explain(displayName = "outputColumnNames")
public List<java.lang.String> getOutputColumnNames() {
return outputColumnNames;
@@ -101,6 +104,7 @@ public class SelectDesc extends AbstractOperatorDesc {
this.outputColumnNames = outputColumnNames;
}
+ @Signature
@Explain(displayName = "SELECT * ")
public String explainNoCompute() {
if (isSelStarNoCompute()) {
@@ -184,4 +188,5 @@ public class SelectDesc extends AbstractOperatorDesc {
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java
index 0057f0c..fd461ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java
@@ -49,6 +49,7 @@ public class Statistics implements Serializable {
private State basicStatsState;
private Map<String, ColStatistics> columnStats;
private State columnStatsState;
+ private boolean runtimeStats;
public Statistics() {
this(0, 0);
@@ -119,6 +120,9 @@ public class Statistics implements Serializable {
@Explain(displayName = "Statistics")
public String toString() {
StringBuilder sb = new StringBuilder();
+ if (runtimeStats) {
+ sb.append("(RUNTIME) ");
+ }
sb.append("Num rows: ");
sb.append(numRows);
if (runTimeNumRows >= 0) {
@@ -136,6 +140,9 @@ public class Statistics implements Serializable {
@Explain(displayName = "Statistics", explainLevels = { Level.USER })
public String toUserLevelExplainString() {
StringBuilder sb = new StringBuilder();
+ if (runtimeStats) {
+ sb.append("runtime: ");
+ }
sb.append("rows=");
sb.append(numRows);
if (runTimeNumRows >= 0) {
@@ -153,6 +160,9 @@ public class Statistics implements Serializable {
public String extendedToString() {
StringBuilder sb = new StringBuilder();
+ if (runtimeStats) {
+ sb.append(" (runtime) ");
+ }
sb.append(" numRows: ");
sb.append(numRows);
sb.append(" dataSize: ");
@@ -179,6 +189,8 @@ public class Statistics implements Serializable {
}
clone.setColumnStats(cloneColStats);
}
+ // TODO: this boolean flag is set only by RS stats annotation at this point
+ //clone.setRuntimeStats(runtimeStats);
return clone;
}
@@ -300,10 +312,13 @@ public class Statistics implements Serializable {
this.runTimeNumRows = runTimeNumRows;
}
- public Statistics scaleToRowCount(long newRowCount) {
+ public Statistics scaleToRowCount(long newRowCount, boolean downScaleOnly) {
Statistics ret;
ret = clone();
- if(numRows == 0 || newRowCount >= numRows) {
+ if (numRows == 0) {
+ return ret;
+ }
+ if (downScaleOnly && newRowCount >= numRows) {
return ret;
}
// FIXME: using real scaling by new/old ration might yield better results?
@@ -311,4 +326,12 @@ public class Statistics implements Serializable {
ret.dataSize = StatsUtils.safeMult(getAvgRowSize(), newRowCount);
return ret;
}
+
+ public boolean isRuntimeStats() {
+ return runtimeStats;
+ }
+
+ public void setRuntimeStats(final boolean runtimeStats) {
+ this.runtimeStats = runtimeStats;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 59968fa..57df7e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.parse.TableSample;
import org.apache.hadoop.hive.ql.plan.BaseWork.BaseExplainVectorization;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
@@ -156,10 +157,20 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
}
@Explain(displayName = "alias")
+ // FIXME: this might not needed to be in the signature; but in that case the compare shouldn't consider it either!
+ @Signature
public String getAlias() {
return alias;
}
+ @Signature
+ public String getPredicateString() {
+ if (filterExpr == null) {
+ return null;
+ }
+ return PlanUtils.getExprListString(Arrays.asList(filterExpr));
+ }
+
@Explain(displayName = "table", jsonOnly = true)
public String getTableName() {
return this.tableName;
@@ -219,6 +230,7 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
return PlanUtils.getExprListString(Arrays.asList(filterExpr));
}
+ // @Signature // XXX
public ExprNodeGenericFuncDesc getFilterExpr() {
return filterExpr;
}
@@ -296,6 +308,7 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
@Override
@Explain(displayName = "GatherStats", explainLevels = { Level.EXTENDED })
+ @Signature
public boolean isGatherStats() {
return gatherStats;
}
@@ -347,6 +360,7 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
this.rowLimit = rowLimit;
}
+ @Signature
public int getRowLimit() {
return rowLimit;
}
@@ -372,6 +386,11 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
return isMetadataOnly;
}
+ // @Signature
+ public String getQualifiedTable() {
+ return tableMetadata.getFullyQualifiedName();
+ }
+
public Table getTableMetadata() {
return tableMetadata;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java
index cf8e6e5..adcf707 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java
@@ -19,10 +19,12 @@
package org.apache.hadoop.hive.ql.plan;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
-import org.apache.hadoop.hive.ql.plan.Explain.Level;
import java.util.Objects;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
/**
* All member variables should have a setters and getters of the form get<member
@@ -54,6 +56,7 @@ public class UDTFDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "function name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ @Signature
public String getUDTFName() {
return genericUDTF.toString();
}
@@ -67,6 +70,7 @@ public class UDTFDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "outer lateral view")
+ @Signature
public String isOuterLateralView() {
return outerLV ? "true" : null;
}
@@ -80,4 +84,5 @@ public class UDTFDesc extends AbstractOperatorDesc {
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
new file mode 100644
index 0000000..57762ed
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+public class EmptyStatsSource implements StatsSource {
+
+ @Override
+ public boolean canProvideStatsFor(Class<?> class1) {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/GroupTransformer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/GroupTransformer.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/GroupTransformer.java
new file mode 100644
index 0000000..7b9e99e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/GroupTransformer.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.LinkGroup;
+
+public interface GroupTransformer {
+ void map(LinkGroup group);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java
new file mode 100644
index 0000000..36d7e58
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Enables to connect related objects to eachother.
+ *
+ * Most importantly it aids to connect Operators to OperatorStats and probably RelNodes.
+ */
+public class PlanMapper {
+
+ Set<LinkGroup> groups = new HashSet<>();
+ private Map<Object, LinkGroup> objectMap = new HashMap<>();
+
+ public class LinkGroup {
+ Set<Object> members = new HashSet<>();
+
+ public void add(Object o) {
+ members.add(o);
+ objectMap.put(o, this);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> List<T> getAll(Class<T> clazz) {
+ List<T> ret = new ArrayList<>();
+ for (Object m : members) {
+ if (clazz.isInstance(m)) {
+ ret.add((T) m);
+ }
+ }
+ return ret;
+ }
+ }
+
+ public void link(Object o1, Object o2) {
+ LinkGroup g1 = objectMap.get(o1);
+ LinkGroup g2 = objectMap.get(o2);
+ if (g1 != null && g2 != null && g1 != g2) {
+ throw new RuntimeException("equivalence mapping violation");
+ }
+ LinkGroup targetGroup = (g1 != null) ? g1 : (g2 != null ? g2 : new LinkGroup());
+ groups.add(targetGroup);
+ targetGroup.add(o1);
+ targetGroup.add(o2);
+ }
+
+ public <T> List<T> getAll(Class<T> clazz) {
+ List<T> ret = new ArrayList<>();
+ for (LinkGroup g : groups) {
+ ret.addAll(g.getAll(clazz));
+ }
+ return ret;
+ }
+
+ public void runMapper(GroupTransformer mapper) {
+ for (LinkGroup equivGroup : groups) {
+ mapper.map(equivGroup);
+ }
+ }
+
+ public <T> List<T> lookupAll(Class<T> clazz, Object key) {
+ LinkGroup group = objectMap.get(key);
+ if (group == null) {
+ throw new NoSuchElementException(Objects.toString(key));
+ }
+ return group.getAll(clazz);
+ }
+
+ public <T> T lookup(Class<T> clazz, Object key) {
+ List<T> all = lookupAll(clazz, key);
+ if (all.size() != 1) {
+ // FIXME: use a different exception type?
+ throw new IllegalArgumentException("Expected match count is 1; but got:" + all);
+ }
+ return all.get(0);
+ }
+
+ @VisibleForTesting
+ public Iterator<LinkGroup> iterateGroups() {
+ return groups.iterator();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapperProcess.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapperProcess.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapperProcess.java
new file mode 100644
index 0000000..424dd79
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapperProcess.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
+import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignatureFactory;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.LinkGroup;
+
+public class PlanMapperProcess {
+
+ private static class OpTreeSignatureMapper implements GroupTransformer {
+
+ private OpTreeSignatureFactory cache = OpTreeSignatureFactory.newCache();
+
+ @Override
+ public void map(LinkGroup group) {
+ List<Operator> operators= group.getAll(Operator.class);
+ for (Operator op : operators) {
+ group.add(OpTreeSignature.of(op,cache));
+ }
+ }
+ }
+
+ public static void runPostProcess(PlanMapper planMapper) {
+ planMapper.runMapper(new OpTreeSignatureMapper());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/RuntimeStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/RuntimeStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/RuntimeStatsSource.java
new file mode 100644
index 0000000..21a0678
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/RuntimeStatsSource.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.util.Optional;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+
+public interface RuntimeStatsSource extends StatsSource {
+ public Optional<OperatorStats> lookup(Operator<?> tsop);
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
new file mode 100644
index 0000000..6f340b8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+
+public class SimpleRuntimeStatsSource implements RuntimeStatsSource {
+
+ private final PlanMapper pm;
+
+
+ public SimpleRuntimeStatsSource(PlanMapper pm) {
+ PlanMapperProcess.runPostProcess(pm);
+ this.pm = pm;
+ }
+
+ @Override
+ public Optional<OperatorStats> lookup(Operator<?> op) {
+ try {
+ OpTreeSignature sig = OpTreeSignature.of(op);
+ List<OperatorStats> v = pm.lookupAll(OperatorStats.class, sig);
+ if (v.size() > 0) {
+ return Optional.of(v.get(0));
+ }
+ return Optional.empty();
+ } catch (NoSuchElementException | IllegalArgumentException iae) {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public boolean canProvideStatsFor(Class<?> class1) {
+ if (Operator.class.isAssignableFrom(class1)) {
+ return true;
+ }
+ if (HiveFilter.class.isAssignableFrom(class1)) {
+ return true;
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java
new file mode 100644
index 0000000..a4cb6e9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+public interface StatsSource {
+
+ boolean canProvideStatsFor(Class<?> class1);
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
new file mode 100644
index 0000000..2b0d23c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.reexec;
+
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
+
+/**
+ * Defines an interface for re-execution logics.
+ *
+ * FIXME: rethink methods.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface IReExecutionPlugin {
+
+ /**
+ * Called when the {@link Driver} is being initialized
+ *
+ * The plugin may add hooks/etc to tap into the system.
+ */
+ void initialize(Driver driver);
+
+ /**
+ * Called before executing the query.
+ */
+ void beforeExecute(int executionIndex, boolean explainReOptimization);
+
+ /**
+ * The query have failed, does this plugin advises to re-execute it again?
+ */
+ boolean shouldReExecute(int executionNum);
+
+ /**
+ * The plugin should prepare for the re-compilaton of the query.
+ */
+ void prepareToReExecute();
+
+ /**
+ * The query have failed; and have been recompiled - does this plugin advises to re-execute it again?
+ */
+ boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, PlanMapper newPlanMapper);
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
new file mode 100644
index 0000000..9303171
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.reexec;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.QueryDisplay;
+import org.apache.hadoop.hive.ql.QueryInfo;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.HiveParser;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
+import org.apache.hadoop.hive.ql.plan.mapper.SimpleRuntimeStatsSource;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Enables to use re-execution logics.
+ *
+ * Covers the IDriver interface, handles query re-execution; and asks clear questions from the underlying re-execution plugins.
+ */
+public class ReExecDriver implements IDriver {
+
+ private class HandleReOptimizationExplain implements HiveSemanticAnalyzerHook {
+
+ @Override
+ public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast) throws SemanticException {
+ if (ast.getType() == HiveParser.TOK_EXPLAIN) {
+ int childCount = ast.getChildCount();
+ for (int i = 1; i < childCount; i++) {
+ if (ast.getChild(i).getType() == HiveParser.KW_REOPTIMIZATION) {
+ explainReOptimization = true;
+ ast.deleteChild(i);
+ break;
+ }
+ }
+ if (explainReOptimization && firstExecution()) {
+ Tree execTree = ast.getChild(0);
+ execTree.setParent(ast.getParent());
+ ast.getParent().setChild(0, execTree);
+ return (ASTNode) execTree;
+ }
+ }
+ return ast;
+ }
+
+ @Override
+ public void postAnalyze(HiveSemanticAnalyzerHookContext context, List<Task<? extends Serializable>> rootTasks)
+ throws SemanticException {
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReExecDriver.class);
+ private boolean explainReOptimization;
+ protected Driver coreDriver;
+ private QueryState queryState;
+ private String currentQuery;
+ private int executionIndex;
+
+ private ArrayList<IReExecutionPlugin> plugins;
+
+ @Override
+ public HiveConf getConf() {
+ return queryState.getConf();
+ }
+
+ public boolean firstExecution() {
+ return executionIndex == 0;
+ }
+
+ public ReExecDriver(QueryState queryState, String userName, QueryInfo queryInfo,
+ ArrayList<IReExecutionPlugin> plugins) {
+ this.queryState = queryState;
+ coreDriver = new Driver(queryState, userName, queryInfo, null);
+ coreDriver.getHookRunner().addSemanticAnalyzerHook(new HandleReOptimizationExplain());
+ this.plugins = plugins;
+
+ for (IReExecutionPlugin p : plugins) {
+ p.initialize(coreDriver);
+ }
+ }
+
+ @Override
+ public int compile(String string) {
+ return coreDriver.compile(string);
+ }
+
+ @Override
+ public CommandProcessorResponse compileAndRespond(String statement) {
+ currentQuery = statement;
+ return coreDriver.compileAndRespond(statement);
+ }
+
+ @Override
+ public QueryPlan getPlan() {
+ return coreDriver.getPlan();
+ }
+
+ @Override
+ public QueryDisplay getQueryDisplay() {
+ return coreDriver.getQueryDisplay();
+ }
+
+ @Override
+ public void setOperationId(String guid64) {
+ coreDriver.setOperationId(guid64);
+ }
+
+ @Override
+ public CommandProcessorResponse run() {
+ executionIndex = 0;
+ int maxExecutuions = 1 + coreDriver.getConf().getIntVar(ConfVars.HIVE_QUERY_MAX_REEXECUTION_COUNT);
+
+
+ while (true) {
+ executionIndex++;
+ for (IReExecutionPlugin p : plugins) {
+ p.beforeExecute(executionIndex, explainReOptimization);
+ }
+ coreDriver.getContext().setExecutionIndex(executionIndex);
+ LOG.info("Execution #{} of query", executionIndex);
+ CommandProcessorResponse cpr = coreDriver.run();
+
+ boolean shouldReExecute = explainReOptimization && executionIndex==1;
+ shouldReExecute |= cpr.getResponseCode() != 0 && shouldReExecute();
+
+ if (executionIndex >= maxExecutuions || !shouldReExecute) {
+ return cpr;
+ }
+ LOG.info("Preparing to re-execute query");
+ prepareToReExecute();
+ PlanMapper oldPlanMapper = coreDriver.getPlanMapper();
+ CommandProcessorResponse compile_resp = coreDriver.compileAndRespond(currentQuery);
+ if (compile_resp.failed()) {
+ // FIXME: somehow place pointers that re-execution compilation have failed; the query have been successfully compiled before?
+ return compile_resp;
+ }
+
+ PlanMapper newPlanMapper = coreDriver.getPlanMapper();
+ if (!explainReOptimization && !shouldReExecuteAfterCompile(oldPlanMapper, newPlanMapper)) {
+ // FIXME: retain old error; or create a new one?
+ return cpr;
+ }
+ }
+ }
+
+ private boolean shouldReExecuteAfterCompile(PlanMapper oldPlanMapper, PlanMapper newPlanMapper) {
+ boolean ret = false;
+ for (IReExecutionPlugin p : plugins) {
+ ret |= p.shouldReExecute(executionIndex, oldPlanMapper, newPlanMapper);
+ }
+ return ret;
+ }
+
+ private boolean shouldReExecute() {
+ boolean ret = false;
+ for (IReExecutionPlugin p : plugins) {
+ ret |= p.shouldReExecute(executionIndex);
+ }
+ return ret;
+ }
+
+ @Override
+ public CommandProcessorResponse run(String command) {
+ CommandProcessorResponse r0 = compileAndRespond(command);
+ if (r0.getResponseCode() != 0) {
+ return r0;
+ }
+ return run();
+ }
+
+ protected void prepareToReExecute() {
+ for (IReExecutionPlugin p : plugins) {
+ p.prepareToReExecute();
+ }
+ }
+
+ @Override
+ public boolean getResults(List res) throws IOException {
+ return coreDriver.getResults(res);
+ }
+
+ @Override
+ public void setMaxRows(int maxRows) {
+ coreDriver.setMaxRows(maxRows);
+ }
+
+ @Override
+ public FetchTask getFetchTask() {
+ return coreDriver.getFetchTask();
+ }
+
+ @Override
+ public Schema getSchema() {
+ return coreDriver.getSchema();
+ }
+
+ @Override
+ public boolean isFetchingTable() {
+ return coreDriver.isFetchingTable();
+ }
+
+ @Override
+ public void resetFetch() throws IOException {
+ coreDriver.resetFetch();
+ }
+
+ @Override
+ public void close() {
+ coreDriver.close();
+ }
+
+ @Override
+ public void destroy() {
+ coreDriver.destroy();
+ }
+
+ @Override
+ public final Context getContext() {
+ return coreDriver.getContext();
+ }
+
+ @VisibleForTesting
+ public void setRuntimeStatsSource(SimpleRuntimeStatsSource statsSource) {
+ coreDriver.setRuntimeStatsSource(statsSource);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
new file mode 100644
index 0000000..4ee3c14
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.reexec;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
+
+/**
+ * Re-Executes a query only adding an extra overlay
+ */
+public class ReExecutionOverlayPlugin implements IReExecutionPlugin {
+
+ private Driver driver;
+ private Map<String, String> subtree;
+
+ class LocalHook implements ExecuteWithHookContext {
+
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+ if (hookContext.getHookType() == HookType.ON_FAILURE_HOOK) {
+ Throwable exception = hookContext.getException();
+ if (exception != null) {
+ if (exception.getMessage().contains("Vertex failed,")) {
+ retryPossible = true;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void initialize(Driver driver) {
+ this.driver = driver;
+ driver.getHookRunner().addOnFailureHook(new LocalHook());
+ HiveConf conf = driver.getConf();
+ subtree = conf.subtree("reexec.overlay");
+ }
+
+ private boolean retryPossible;
+
+ @Override
+ public void prepareToReExecute() {
+ HiveConf conf = driver.getConf();
+ conf.verifyAndSetAll(subtree);
+ }
+
+ @Override
+ public boolean shouldReExecute(int executionNum) {
+ return executionNum == 1 && !subtree.isEmpty() && retryPossible;
+ }
+
+ @Override
+ public boolean shouldReExecute(int executionNum, PlanMapper pm1, PlanMapper pm2) {
+ return executionNum == 1;
+ }
+
+ @Override
+ public void beforeExecute(int executionIndex, boolean explainReOptimization) {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
new file mode 100644
index 0000000..7078587
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.reexec;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
+import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
+import org.apache.hadoop.hive.ql.plan.mapper.SimpleRuntimeStatsSource;
+import org.apache.hadoop.hive.ql.stats.OperatorStatsReaderHook;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class ReOptimizePlugin implements IReExecutionPlugin {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReOptimizePlugin.class);
+
+ private boolean retryPossible;
+
+ private Driver coreDriver;
+
+ private OperatorStatsReaderHook statsReaderHook;
+
+ class LocalHook implements ExecuteWithHookContext {
+
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+ if (hookContext.getHookType() == HookType.ON_FAILURE_HOOK) {
+ Throwable exception = hookContext.getException();
+ if (exception != null) {
+ {
+ String message = exception.getMessage();
+ if (message != null) {
+ boolean isOOM = message.contains(MapJoinMemoryExhaustionError.class.getName())
+ || message.contains(OutOfMemoryError.class.getName());
+ if (message.contains("Vertex failed,") && isOOM) {
+ retryPossible = true;
+ }
+ System.out.println(exception);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void initialize(Driver driver) {
+ coreDriver = driver;
+ coreDriver.getHookRunner().addOnFailureHook(new LocalHook());
+ statsReaderHook = new OperatorStatsReaderHook();
+ coreDriver.getHookRunner().addOnFailureHook(statsReaderHook);
+ coreDriver.getHookRunner().addPostHook(statsReaderHook);
+ // statsReaderHook.setCollectOnSuccess(true);
+ statsReaderHook.setCollectOnSuccess(
+ driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS));
+ }
+
+ @Override
+ public boolean shouldReExecute(int executionNum) {
+ return retryPossible;
+ }
+
+ @Override
+ public void prepareToReExecute() {
+ statsReaderHook.setCollectOnSuccess(true);
+ PlanMapper pm = coreDriver.getContext().getPlanMapper();
+ coreDriver.setRuntimeStatsSource(new SimpleRuntimeStatsSource(pm));
+ retryPossible = false;
+ }
+
+ @Override
+ public boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, PlanMapper newPlanMapper) {
+ return planDidChange(oldPlanMapper, newPlanMapper);
+ }
+
+ private boolean planDidChange(PlanMapper pmL, PlanMapper pmR) {
+ List<Operator> opsL = getRootOps(pmL);
+ List<Operator> opsR = getRootOps(pmR);
+ for (Iterator<Operator> itL = opsL.iterator(); itL.hasNext();) {
+ Operator<?> opL = itL.next();
+ for (Iterator<Operator> itR = opsR.iterator(); itR.hasNext();) {
+ Operator<?> opR = itR.next();
+ if (opL.logicalEqualsTree(opR)) {
+ itL.remove();
+ itR.remove();
+ break;
+ }
+ }
+ }
+ return opsL.isEmpty() && opsR.isEmpty();
+ }
+
+ private List<Operator> getRootOps(PlanMapper pmL) {
+ List<Operator> ops = pmL.getAll(Operator.class);
+ for (Iterator<Operator> iterator = ops.iterator(); iterator.hasNext();) {
+ Operator o = iterator.next();
+ if (o.getNumChild() != 0) {
+ iterator.remove();
+ }
+ }
+ return ops;
+ }
+
+ @Override
+ public void beforeExecute(int executionIndex, boolean explainReOptimization) {
+ if (explainReOptimization) {
+ statsReaderHook.setCollectOnSuccess(true);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java
new file mode 100644
index 0000000..52e18a8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.stats;
+
+public class OperatorStats {
+ private final String operatorId;
+ private long outputRecords;
+
+ public OperatorStats(final String opId) {
+ this.operatorId = opId;
+ this.outputRecords = -1;
+ }
+
+ public long getOutputRecords() {
+ return outputRecords;
+ }
+
+ public void setOutputRecords(final long outputRecords) {
+ this.outputRecords = outputRecords;
+ }
+
+ public String getOperatorId() {
+ return operatorId;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("OperatorStats %s records: %d", operatorId, outputRecords);
+ }
+}