You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/01/19 03:47:38 UTC
[4/4] hive git commit: HIVE-12758 : Parallel compilation:
Operator::resetId() is not thread-safe (Sergey Shelukhin, reviewed by Gopal V)
HIVE-12758 : Parallel compilation: Operator::resetId() is not thread-safe (Sergey Shelukhin, reviewed by Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/88fceaca
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/88fceaca
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/88fceaca
Branch: refs/heads/master
Commit: 88fceacaa1f4d9d2d4e56850006d4d1ddfdbf102
Parents: 4eab2df
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Jan 18 18:47:17 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Jan 18 18:47:17 2016 -0800
----------------------------------------------------------------------
.../mapreduce/TestHCatMultiOutputFormat.java | 4 +-
.../hadoop/hive/ql/CompilationOpContext.java | 36 +++
.../java/org/apache/hadoop/hive/ql/Context.java | 5 +
.../java/org/apache/hadoop/hive/ql/Driver.java | 10 +-
.../hive/ql/exec/AbstractFileMergeOperator.java | 10 +
.../hive/ql/exec/AbstractMapJoinOperator.java | 9 +-
.../hive/ql/exec/AppMasterEventOperator.java | 10 +
.../hadoop/hive/ql/exec/CollectOperator.java | 10 +
.../hadoop/hive/ql/exec/ColumnStatsTask.java | 8 +-
.../hive/ql/exec/ColumnStatsUpdateTask.java | 6 +-
.../hadoop/hive/ql/exec/CommonJoinOperator.java | 12 +-
.../hive/ql/exec/CommonMergeJoinOperator.java | 8 +-
.../hadoop/hive/ql/exec/ConditionalTask.java | 5 -
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 19 +-
.../hadoop/hive/ql/exec/DemuxOperator.java | 10 +
.../hadoop/hive/ql/exec/DummyStoreOperator.java | 8 +-
.../apache/hadoop/hive/ql/exec/FetchTask.java | 8 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 10 +
.../hadoop/hive/ql/exec/FilterOperator.java | 8 +-
.../hadoop/hive/ql/exec/ForwardOperator.java | 10 +
.../hadoop/hive/ql/exec/FunctionTask.java | 6 +-
.../hadoop/hive/ql/exec/GroupByOperator.java | 10 +
.../hive/ql/exec/HashTableDummyOperator.java | 10 +
.../hive/ql/exec/HashTableSinkOperator.java | 12 +-
.../hadoop/hive/ql/exec/JoinOperator.java | 10 +
.../ql/exec/LateralViewForwardOperator.java | 10 +
.../hive/ql/exec/LateralViewJoinOperator.java | 10 +
.../hadoop/hive/ql/exec/LimitOperator.java | 10 +
.../hadoop/hive/ql/exec/ListSinkOperator.java | 10 +
.../hadoop/hive/ql/exec/MapJoinOperator.java | 9 +-
.../apache/hadoop/hive/ql/exec/MapOperator.java | 10 +
.../apache/hadoop/hive/ql/exec/MuxOperator.java | 10 +
.../apache/hadoop/hive/ql/exec/Operator.java | 57 ++--
.../hadoop/hive/ql/exec/OperatorFactory.java | 287 ++++++++++---------
.../hive/ql/exec/OrcFileMergeOperator.java | 10 +
.../apache/hadoop/hive/ql/exec/PTFOperator.java | 10 +
.../hive/ql/exec/RCFileMergeOperator.java | 11 +
.../hadoop/hive/ql/exec/ReduceSinkOperator.java | 10 +
.../hadoop/hive/ql/exec/SMBMapJoinOperator.java | 9 +-
.../hadoop/hive/ql/exec/ScriptOperator.java | 10 +
.../hadoop/hive/ql/exec/SelectOperator.java | 10 +
.../hive/ql/exec/SerializationUtilities.java | 51 +++-
.../ql/exec/SparkHashTableSinkOperator.java | 12 +-
.../hadoop/hive/ql/exec/StatsNoJobTask.java | 6 +-
.../hadoop/hive/ql/exec/TableScanOperator.java | 10 +
.../org/apache/hadoop/hive/ql/exec/Task.java | 4 +-
.../hive/ql/exec/TemporaryHashSinkOperator.java | 4 +-
.../hadoop/hive/ql/exec/TerminalOperator.java | 9 +
.../hive/ql/exec/TezDummyStoreOperator.java | 10 +
.../hadoop/hive/ql/exec/UDTFOperator.java | 10 +
.../hadoop/hive/ql/exec/UnionOperator.java | 12 +-
.../hadoop/hive/ql/exec/mr/ExecDriver.java | 6 +-
.../hadoop/hive/ql/exec/mr/ExecMapper.java | 6 +-
.../hadoop/hive/ql/exec/mr/HashTableLoader.java | 3 +-
.../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 6 +-
.../hive/ql/exec/spark/HashTableLoader.java | 3 +-
.../ql/exec/spark/SparkMapRecordHandler.java | 6 +-
.../hadoop/hive/ql/exec/spark/SparkTask.java | 6 +-
.../hive/ql/exec/tez/MapRecordProcessor.java | 10 +-
.../vector/VectorAppMasterEventOperator.java | 15 +-
.../ql/exec/vector/VectorFileSinkOperator.java | 14 +-
.../ql/exec/vector/VectorFilterOperator.java | 14 +-
.../ql/exec/vector/VectorGroupByOperator.java | 15 +-
.../ql/exec/vector/VectorLimitOperator.java | 12 +-
.../exec/vector/VectorMapJoinBaseOperator.java | 14 +-
.../ql/exec/vector/VectorMapJoinOperator.java | 14 +-
.../VectorMapJoinOuterFilteredOperator.java | 14 +-
.../hive/ql/exec/vector/VectorMapOperator.java | 10 +
.../exec/vector/VectorReduceSinkOperator.java | 14 +-
.../exec/vector/VectorSMBMapJoinOperator.java | 14 +-
.../ql/exec/vector/VectorSelectOperator.java | 14 +-
.../VectorSparkHashTableSinkOperator.java | 14 +-
...VectorSparkPartitionPruningSinkOperator.java | 15 +-
.../mapjoin/VectorMapJoinCommonOperator.java | 14 +-
.../VectorMapJoinGenerateResultOperator.java | 14 +-
...pJoinInnerBigOnlyGenerateResultOperator.java | 14 +-
.../VectorMapJoinInnerBigOnlyLongOperator.java | 13 +-
...ctorMapJoinInnerBigOnlyMultiKeyOperator.java | 13 +-
...VectorMapJoinInnerBigOnlyStringOperator.java | 13 +-
...ectorMapJoinInnerGenerateResultOperator.java | 14 +-
.../mapjoin/VectorMapJoinInnerLongOperator.java | 13 +-
.../VectorMapJoinInnerMultiKeyOperator.java | 13 +-
.../VectorMapJoinInnerStringOperator.java | 13 +-
...orMapJoinLeftSemiGenerateResultOperator.java | 14 +-
.../VectorMapJoinLeftSemiLongOperator.java | 13 +-
.../VectorMapJoinLeftSemiMultiKeyOperator.java | 13 +-
.../VectorMapJoinLeftSemiStringOperator.java | 13 +-
...ectorMapJoinOuterGenerateResultOperator.java | 14 +-
.../mapjoin/VectorMapJoinOuterLongOperator.java | 13 +-
.../VectorMapJoinOuterMultiKeyOperator.java | 13 +-
.../VectorMapJoinOuterStringOperator.java | 13 +-
.../VectorReduceSinkCommonOperator.java | 14 +-
.../VectorReduceSinkLongOperator.java | 14 +-
.../VectorReduceSinkMultiKeyOperator.java | 14 +-
.../VectorReduceSinkStringOperator.java | 14 +-
.../apache/hadoop/hive/ql/hooks/ATSHook.java | 2 +-
.../hadoop/hive/ql/io/merge/MergeFileTask.java | 5 +-
.../ql/io/rcfile/stats/PartialScanTask.java | 7 +-
.../io/rcfile/truncate/ColumnTruncateTask.java | 5 +-
.../hive/ql/optimizer/AbstractSMBJoinProc.java | 2 +-
.../hive/ql/optimizer/ConvertJoinMapJoin.java | 8 +-
.../DynamicPartitionPruningOptimization.java | 4 +-
.../hive/ql/optimizer/GenMRTableScan1.java | 2 +-
.../hive/ql/optimizer/GenMapRedUtils.java | 23 +-
.../hive/ql/optimizer/GroupByOptimizer.java | 4 +-
.../hive/ql/optimizer/MapJoinProcessor.java | 12 +-
.../ql/optimizer/ReduceSinkMapJoinProc.java | 4 +-
.../ql/optimizer/SimpleFetchAggregation.java | 3 +-
.../hive/ql/optimizer/SimpleFetchOptimizer.java | 3 +-
.../hive/ql/optimizer/SkewJoinOptimizer.java | 8 +-
.../calcite/translator/HiveGBOpConvUtil.java | 10 +-
.../calcite/translator/HiveOpConverter.java | 34 +--
.../QueryPlanTreeTransformation.java | 12 +-
.../physical/CommonJoinTaskDispatcher.java | 4 +-
.../physical/GenMRSkewJoinProcessor.java | 7 +-
.../physical/GenSparkSkewJoinProcessor.java | 13 +-
.../physical/LocalMapJoinProcFactory.java | 5 +-
.../hive/ql/optimizer/physical/Vectorizer.java | 15 +-
.../spark/SparkReduceSinkMapJoinProc.java | 7 +-
.../optimizer/unionproc/UnionProcFactory.java | 3 +-
.../hive/ql/parse/BaseSemanticAnalyzer.java | 2 +
.../hive/ql/parse/ExplainSemanticAnalyzer.java | 2 +-
.../apache/hadoop/hive/ql/parse/GenTezWork.java | 5 +-
.../hive/ql/parse/ProcessAnalyzeTable.java | 3 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 31 +-
.../hive/ql/parse/spark/GenSparkWork.java | 3 +-
.../SparkPartitionPruningSinkOperator.java | 10 +
.../parse/spark/SparkProcessAnalyzeTable.java | 3 +-
.../apache/hadoop/hive/ql/plan/BaseWork.java | 1 +
.../hadoop/hive/ql/plan/ColumnStatsWork.java | 5 +-
.../apache/hadoop/hive/ql/plan/FetchWork.java | 5 +-
.../org/apache/hadoop/hive/ql/plan/MapWork.java | 5 +
.../apache/hadoop/hive/ql/plan/MapredWork.java | 7 +-
.../hadoop/hive/ql/plan/MergeJoinWork.java | 5 +
.../apache/hadoop/hive/ql/plan/ReduceWork.java | 6 +
.../apache/hadoop/hive/ql/plan/UnionWork.java | 5 +
.../hadoop/hive/ql/ppd/OpProcFactory.java | 3 +-
.../ql/ppd/PredicateTransitivePropagate.java | 4 +-
.../hive/ql/ppd/SyntheticJoinPredicate.java | 4 +-
.../hadoop/hive/ql/exec/TestExecDriver.java | 38 +--
.../hive/ql/exec/TestFileSinkOperator.java | 4 +-
.../hadoop/hive/ql/exec/TestOperators.java | 17 +-
.../apache/hadoop/hive/ql/exec/TestPlan.java | 3 +-
.../exec/vector/TestVectorFilterOperator.java | 3 +-
.../exec/vector/TestVectorGroupByOperator.java | 58 ++--
.../ql/exec/vector/TestVectorLimitOperator.java | 3 +-
.../exec/vector/TestVectorSelectOperator.java | 10 +-
.../vector/util/FakeCaptureOutputOperator.java | 14 +-
.../util/FakeVectorDataSourceOperator.java | 16 +-
.../ql/optimizer/physical/TestVectorizer.java | 7 +-
.../hadoop/hive/ql/parse/TestGenTezWork.java | 8 +-
.../parse/TestUpdateDeleteSemanticAnalyzer.java | 2 +-
.../hive/ql/testutil/BaseScalarUdfTest.java | 3 +-
.../results/clientpositive/auto_join0.q.out | 8 +-
.../cbo_rp_cross_product_check_2.q.out | 4 +-
.../clientpositive/cross_product_check_2.q.out | 4 +-
.../subquery_multiinsert.q.java1.7.out | 4 +-
.../subquery_multiinsert.q.java1.8.out | 60 ++--
158 files changed, 1379 insertions(+), 598 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
index 8148faa..61efc1a 100644
--- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
+++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -40,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -392,7 +392,7 @@ public class TestHCatMultiOutputFormat {
}
FetchTask task = new FetchTask();
task.setWork(work);
- task.initialize(conf, null, null);
+ task.initialize(conf, null, null, new CompilationOpContext());
task.fetch(temp);
for (String str : temp) {
results.add(str.replace("\t", ","));
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java b/ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java
new file mode 100644
index 0000000..949f873
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A subset of compilation context that is passed to operators to get rid of some globals.
+ * Perhaps this should be rolled into main Context; however, some code necessitates storing the
+ * context in the operators for now, so this may not be advisable given how much stuff the main
+ * Context class contains.
+ * For now, only the operator sequence ID lives here.
+ */
+public class CompilationOpContext {
+ private final AtomicInteger opSeqId = new AtomicInteger(0);
+
+ public int nextOperatorId() {
+ return opSeqId.getAndIncrement();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index affaec8..746456b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -67,6 +67,7 @@ public class Context {
private int resDirFilesNum;
boolean initialized;
String originalTracker = null;
+ private final CompilationOpContext opContext;
private final Map<String, ContentSummary> pathToCS = new ConcurrentHashMap<String, ContentSummary>();
// scratch path to use for all non-local (ie. hdfs) file system tmp folders
@@ -133,6 +134,7 @@ public class Context {
localScratchDir = new Path(SessionState.getLocalSessionPath(conf), executionId).toUri().getPath();
scratchDirPermission = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION);
stagingDir = HiveConf.getVar(conf, HiveConf.ConfVars.STAGINGDIR);
+ opContext = new CompilationOpContext();
}
@@ -715,4 +717,7 @@ public class Context {
this.cboSucceeded = cboSucceeded;
}
+ public CompilationOpContext getOpContext() {
+ return opContext;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 12a7eea..75187cf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -169,7 +169,7 @@ public class Driver implements CommandProcessor {
@Override
public void init() {
- Operator.resetId();
+ // Nothing for now.
}
/**
@@ -494,7 +494,7 @@ public class Driver implements CommandProcessor {
// initialize FetchTask right here
if (plan.getFetchTask() != null) {
- plan.getFetchTask().initialize(conf, plan, null);
+ plan.getFetchTask().initialize(conf, plan, null, ctx.getOpContext());
}
//do the authorization check
@@ -572,7 +572,7 @@ public class Driver implements CommandProcessor {
ASTNode astTree) throws IOException {
String ret = null;
ExplainTask task = new ExplainTask();
- task.initialize(conf, plan, null);
+ task.initialize(conf, plan, null, ctx.getOpContext());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
try {
@@ -1775,7 +1775,7 @@ public class Driver implements CommandProcessor {
cxt.incCurJobNo(1);
console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
}
- tsk.initialize(conf, plan, cxt);
+ tsk.initialize(conf, plan, cxt, ctx.getOpContext());
TaskResult tskRes = new TaskResult();
TaskRunner tskRun = new TaskRunner(tsk, tskRes);
@@ -1865,7 +1865,7 @@ public class Driver implements CommandProcessor {
throw new IOException("Error closing the current fetch task", e);
}
// FetchTask should not depend on the plan.
- fetchTask.initialize(conf, null, null);
+ fetchTask.initialize(conf, null, null, ctx.getOpContext());
} else {
ctx.resetStream();
resStream = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index a3ec0e1..f99bf11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
@@ -61,6 +62,15 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
protected Set<Path> incompatFileSet;
protected transient DynamicPartitionCtx dpCtx;
+ /** Kryo ctor. */
+ protected AbstractFileMergeOperator() {
+ super();
+ }
+
+ public AbstractFileMergeOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
public void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
index 7302688..69ba4a2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -50,7 +51,13 @@ public abstract class AbstractMapJoinOperator <T extends MapJoinDesc> extends Co
transient int numMapRowsRead;
- public AbstractMapJoinOperator() {
+ /** Kryo ctor. */
+ protected AbstractMapJoinOperator() {
+ super();
+ }
+
+ public AbstractMapJoinOperator(CompilationOpContext ctx) {
+ super(ctx);
}
public AbstractMapJoinOperator(AbstractMapJoinOperator<? extends MapJoinDesc> mjop) {
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
index 7114177..743098b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.tez.TezContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
@@ -51,6 +52,15 @@ public class AppMasterEventOperator extends Operator<AppMasterEventDesc> {
protected transient boolean hasReachedMaxSize = false;
protected transient long MAX_SIZE;
+ /** Kryo ctor. */
+ protected AppMasterEventOperator() {
+ super();
+ }
+
+ public AppMasterEventOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
public void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
index e2f4f58..27ddf13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -42,6 +43,15 @@ public class CollectOperator extends Operator<CollectDesc> implements
protected transient ObjectInspector standardRowInspector;
transient int maxSize;
+ /** Kryo ctor. */
+ protected CollectOperator() {
+ super();
+ }
+
+ public CollectOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
index f6fbe74..7914471 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -81,9 +82,10 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
}
@Override
- public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
- super.initialize(conf, queryPlan, ctx);
- work.initializeForFetch();
+ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx,
+ CompilationOpContext opContext) {
+ super.initialize(conf, queryPlan, ctx, opContext);
+ work.initializeForFetch(opContext);
try {
JobConf job = new JobConf(conf);
ftOp = new FetchOperator(work.getfWork(), job);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
index dcbbe2e..a1b98f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -68,8 +69,9 @@ public class ColumnStatsUpdateTask extends Task<ColumnStatsUpdateWork> {
.getLogger(ColumnStatsUpdateTask.class);
@Override
- public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
- super.initialize(conf, queryPlan, ctx);
+ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx,
+ CompilationOpContext opContext) {
+ super.initialize(conf, queryPlan, ctx, opContext);
}
private ColumnStatistics constructColumnStatsFromInput()
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
index b0170f5..f8520f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.persistence.AbstractRowContainer;
import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -125,17 +126,23 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends
protected transient int heartbeatInterval;
protected static final int NOTSKIPBIGTABLE = -1;
- public CommonJoinOperator() {
+ /** Kryo ctor. */
+ protected CommonJoinOperator() {
+ super();
+ }
+
+ public CommonJoinOperator(CompilationOpContext ctx) {
+ super(ctx);
}
public CommonJoinOperator(CommonJoinOperator<T> clone) {
+ super(clone.id, clone.cContext);
this.joinEmitInterval = clone.joinEmitInterval;
this.joinCacheSize = clone.joinCacheSize;
this.nextSz = clone.nextSz;
this.childOperators = clone.childOperators;
this.parentOperators = clone.parentOperators;
this.done = clone.done;
- this.operatorId = clone.operatorId;
this.storage = clone.storage;
this.condn = clone.condn;
this.conf = clone.getConf();
@@ -150,7 +157,6 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends
this.groupKeyObject = clone.groupKeyObject;
this.handleSkewJoin = clone.handleSkewJoin;
this.hconf = clone.hconf;
- this.id = clone.id;
this.inputObjInspectors = clone.inputObjInspectors;
this.noOuterJoin = clone.noOuterJoin;
this.numAliases = clone.numAliases;
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
index 1cbd13d..8693200 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
import org.apache.hadoop.hive.ql.exec.tez.RecordSource;
import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource;
@@ -90,10 +91,15 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
new ArrayList<Operator<? extends OperatorDesc>>();
transient Set<Integer> fetchInputAtClose;
- public CommonMergeJoinOperator() {
+ /** Kryo ctor. */
+ protected CommonMergeJoinOperator() {
super();
}
+ public CommonMergeJoinOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@SuppressWarnings("unchecked")
@Override
public void initializeOp(Configuration hconf) throws HiveException {
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
index 031331e..c96c813 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
@@ -72,11 +72,6 @@ public class ConditionalTask extends Task<ConditionalWork> implements Serializab
}
@Override
- public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
- super.initialize(conf, queryPlan, driverContext);
- }
-
- @Override
public int execute(DriverContext driverContext) {
resTasks = resolver.getTasks(conf, resolverCtx);
resolved = true;
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 5f3a9cf..94bb73c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -260,8 +261,9 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
}
@Override
- public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
- super.initialize(conf, queryPlan, ctx);
+ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx,
+ CompilationOpContext opContext) {
+ super.initialize(conf, queryPlan, ctx, opContext);
// Pick the formatter to use to display the results. Either the
// normal human readable output or a json object.
@@ -507,7 +509,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
AlterTablePartMergeFilesDesc mergeFilesDesc = work.getMergeFilesDesc();
if (mergeFilesDesc != null) {
- return mergeFiles(db, mergeFilesDesc);
+ return mergeFiles(db, mergeFilesDesc, driverContext);
}
AlterTableAlterPartDesc alterPartDesc = work.getAlterTableAlterPartDesc();
@@ -609,8 +611,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
* @return
* @throws HiveException
*/
- private int mergeFiles(Hive db, AlterTablePartMergeFilesDesc mergeFilesDesc)
- throws HiveException {
+ private int mergeFiles(Hive db, AlterTablePartMergeFilesDesc mergeFilesDesc,
+ DriverContext driverContext) throws HiveException {
ListBucketingCtx lbCtx = mergeFilesDesc.getLbCtx();
boolean lbatc = lbCtx == null ? false : lbCtx.isSkewedStoredAsDir();
int lbd = lbCtx == null ? 0 : lbCtx.calculateListBucketingLevel();
@@ -642,7 +644,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
fmd.setListBucketingDepth(lbd);
fmd.setOutputPath(mergeFilesDesc.getOutputDir());
- Operator<? extends OperatorDesc> mergeOp = OperatorFactory.get(fmd);
+ CompilationOpContext opContext = driverContext.getCtx().getOpContext();
+ Operator<? extends OperatorDesc> mergeOp = OperatorFactory.get(opContext, fmd);
LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork =
new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
@@ -662,7 +665,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
}
// initialize the task and execute
- task.initialize(db.getConf(), getQueryPlan(), driverCxt);
+ task.initialize(db.getConf(), getQueryPlan(), driverCxt, opContext);
int ret = task.execute(driverCxt);
return ret;
}
@@ -4270,7 +4273,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
truncateWork.setMapperCannotSpanPartns(true);
DriverContext driverCxt = new DriverContext();
ColumnTruncateTask taskExec = new ColumnTruncateTask();
- taskExec.initialize(db.getConf(), null, driverCxt);
+ taskExec.initialize(db.getConf(), null, driverCxt, null);
taskExec.setWork(truncateWork);
taskExec.setQueryPlan(this.getQueryPlan());
return taskExec.execute(driverCxt);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java
index 0888c7b..b897c16 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.DemuxDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -109,6 +110,15 @@ public class DemuxOperator extends Operator<DemuxDesc>
// its children's parents lists, also see childOperatorsTag in Operator) at here.
private int[][] newChildOperatorsTag;
+ /** Kryo ctor. */
+ protected DemuxOperator() {
+ super();
+ }
+
+ public DemuxOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
index 0c12570..06a3884 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -69,10 +70,15 @@ public class DummyStoreOperator extends Operator<DummyStoreDesc> implements Seri
protected transient InspectableObject result;
- public DummyStoreOperator() {
+ /** Kryo ctor. */
+ protected DummyStoreOperator() {
super();
}
+ public DummyStoreOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index 1634143..4415328 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
@@ -59,9 +60,10 @@ public class FetchTask extends Task<FetchWork> implements Serializable {
}
@Override
- public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
- super.initialize(conf, queryPlan, ctx);
- work.initializeForFetch();
+ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx,
+ CompilationOpContext opContext) {
+ super.initialize(conf, queryPlan, ctx, opContext);
+ work.initializeForFetch(opContext);
try {
// Create a file system handle
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 32bfcf5..2fa3d96 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -322,6 +323,15 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
childSpecPathDynLinkedPartitions = conf.getDirName().getName();
}
+ /** Kryo ctor. */
+ protected FileSinkOperator() {
+ super();
+ }
+
+ public FileSinkOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
index 0e7e79d..08f2633 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -45,8 +46,13 @@ public class FilterOperator extends Operator<FilterDesc> implements
private transient IOContext ioContext;
protected transient int heartbeatInterval;
- public FilterOperator() {
+ /** Kryo ctor. */
+ protected FilterOperator() {
super();
+ }
+
+ public FilterOperator(CompilationOpContext ctx) {
+ super(ctx);
consecutiveSearches = 0;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
index 7a4c58a..2df7cca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ForwardDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -61,6 +62,15 @@ public class ForwardOperator extends Operator<ForwardDesc> implements
return "FOR";
}
+ /** Kryo ctor. */
+ protected ForwardOperator() {
+ super();
+ }
+
+ public ForwardOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
index ec755a8..ed6f062 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.ResourceType;
import org.apache.hadoop.hive.metastore.api.ResourceUri;
import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -60,8 +61,9 @@ public class FunctionTask extends Task<FunctionWork> {
}
@Override
- public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
- super.initialize(conf, queryPlan, ctx);
+ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx,
+ CompilationOpContext opContext) {
+ super.initialize(conf, queryPlan, ctx, opContext);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 1693ec3..0839b42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -37,6 +37,7 @@ import javolution.util.FastBitSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
@@ -179,6 +180,15 @@ public class GroupByOperator extends Operator<GroupByDesc> {
return bits;
}
+ /** Kryo ctor. */
+ protected GroupByOperator() {
+ super();
+ }
+
+ public GroupByOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
index 1de8c76..4749247 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -32,6 +33,15 @@ import org.apache.hadoop.hive.serde2.SerDeUtils;
public class HashTableDummyOperator extends Operator<HashTableDummyDesc> implements Serializable {
private static final long serialVersionUID = 1L;
+ /** Kryo ctor. */
+ protected HashTableDummyOperator() {
+ super();
+ }
+
+ public HashTableDummyOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
index 76308f6..deb7c76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer;
@@ -104,10 +105,17 @@ public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc> i
private long hashTableScale;
private MapJoinMemoryExhaustionHandler memoryExhaustionHandler;
- public HashTableSinkOperator() {
+ /** Kryo ctor. */
+ protected HashTableSinkOperator() {
+ super();
}
- public HashTableSinkOperator(MapJoinOperator mjop) {
+ public HashTableSinkOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
+ public HashTableSinkOperator(CompilationOpContext ctx, MapJoinOperator mjop) {
+ this(ctx);
this.conf = new HashTableSinkDesc(mjop.getConf());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
index 3453fc9..08cc4b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -55,6 +56,15 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial
private final transient LongWritable skewjoin_followup_jobs = new LongWritable(0);
+ /** Kryo ctor. */
+ protected JoinOperator() {
+ super();
+ }
+
+ public JoinOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java
index e866eed..4c94ad9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.LateralViewForwardDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -55,6 +56,15 @@ public class LateralViewForwardOperator extends Operator<LateralViewForwardDesc>
return OperatorType.LATERALVIEWFORWARD;
}
+ /** Kryo ctor. */
+ protected LateralViewForwardOperator() {
+ super();
+ }
+
+ public LateralViewForwardOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
index 55bb08f..7407dc6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -83,6 +84,15 @@ public class LateralViewJoinOperator extends Operator<LateralViewJoinDesc> {
public static final byte SELECT_TAG = 0;
public static final byte UDTF_TAG = 1;
+ /** Kryo ctor. */
+ protected LateralViewJoinOperator() {
+ super();
+ }
+
+ public LateralViewJoinOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
index fc85bea..239d56b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.LimitDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -39,6 +40,15 @@ public class LimitOperator extends Operator<LimitDesc> implements Serializable {
protected transient int currCount;
protected transient boolean isMap;
+ /** Kryo ctor. */
+ protected LimitOperator() {
+ super();
+ }
+
+ public LimitOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
index 919e72f..2f2abc1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
@@ -24,6 +24,7 @@ import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -43,6 +44,15 @@ public class ListSinkOperator extends Operator<ListSinkDesc> {
private transient FetchFormatter fetcher;
private transient int numRows;
+ /** Kryo ctor. */
+ protected ListSinkOperator() {
+ super();
+ }
+
+ public ListSinkOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index dc0b85e..91b5ca7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.HashTableLoaderFactory;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.persistence.BytesBytesMultiHashMap;
@@ -97,7 +98,13 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
protected HybridHashTableContainer firstSmallTable; // The first small table;
// Only this table has spilled big table rows
- public MapJoinOperator() {
+ /** Kryo ctor. */
+ protected MapJoinOperator() {
+ super();
+ }
+
+ public MapJoinOperator(CompilationOpContext ctx) {
+ super(ctx);
}
public MapJoinOperator(AbstractMapJoinOperator<? extends MapJoinDesc> mjop) {
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index ec0d95c..2b690f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.MapOperator.MapOpCtx;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -463,6 +464,15 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
return nominal;
}
+ /** Kryo ctor. */
+ protected MapOperator() {
+ super();
+ }
+
+ public MapOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
public void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java
index 4f4abd3..d8444fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.MuxDesc;
@@ -170,6 +171,15 @@ public class MuxOperator extends Operator<MuxDesc> implements Serializable{
private transient long[] cntrs;
private transient long[] nextCntrs;
+ /** Kryo ctor. */
+ protected MuxOperator() {
+ super();
+ }
+
+ public MuxOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 85ab6b2..dbe4f80 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -30,11 +30,11 @@ import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -67,6 +67,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
public static final String CONTEXT_NAME_KEY = "__hive.context.name";
private transient Configuration configuration;
+ protected transient CompilationOpContext cContext;
protected List<Operator<? extends OperatorDesc>> childOperators;
protected List<Operator<? extends OperatorDesc>> parentOperators;
protected String operatorId;
@@ -75,8 +76,6 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
private transient boolean rootInitializeCalled = false;
protected final transient Collection<Future<?>> asyncInitOperations = new HashSet<>();
- private static AtomicInteger seqId;
-
// It can be optimized later so that an operator operator (init/close) is performed
// only after that operation has been performed on all the parents. This will require
// initializing the whole tree in all the mappers (which might be required for mappers
@@ -98,38 +97,24 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
protected transient State state = State.UNINIT;
- static {
- seqId = new AtomicInteger(0);
- }
-
private boolean useBucketizedHiveInputFormat;
// dummy operator (for not increasing seqId)
- private Operator(String name) {
- id = name;
+ protected Operator(String name, CompilationOpContext cContext) {
+ this();
+ this.cContext = cContext;
+ this.id = name;
initOperatorId();
+ }
+
+ protected Operator() {
childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
abortOp = new AtomicBoolean(false);
}
- public Operator() {
- this(String.valueOf(seqId.getAndIncrement()));
- }
-
- public static void resetId() {
- seqId.set(0);
- }
-
- /**
- * Create an operator with a reporter.
- *
- * @param reporter
- * Used to report progress of certain operators.
- */
- public Operator(Reporter reporter) {
- this();
- this.reporter = reporter;
+ public Operator(CompilationOpContext cContext) {
+ this(String.valueOf(cContext.nextOperatorId()), cContext);
}
public void setChildOperators(
@@ -228,7 +213,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
protected transient final boolean isLogTraceEnabled = LOG.isTraceEnabled() && PLOG.isTraceEnabled();
protected transient String alias;
protected transient Reporter reporter;
- protected transient String id;
+ protected String id;
// object inspectors for input rows
// We will increase the size of the array on demand
protected transient ObjectInspector[] inputObjInspectors = new ObjectInspector[1];
@@ -1129,8 +1114,8 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
@SuppressWarnings("unchecked")
T descClone = (T)conf.clone();
// also clone the colExprMap by default
- Operator<? extends OperatorDesc> ret =
- OperatorFactory.getAndMakeChild(descClone, getSchema(), getColumnExprMap(), parentClones);
+ Operator<? extends OperatorDesc> ret = OperatorFactory.getAndMakeChild(
+ cContext, descClone, getSchema(), getColumnExprMap(), parentClones);
return ret;
}
@@ -1145,8 +1130,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException {
T descClone = (T) conf.clone();
Operator<? extends OperatorDesc> ret =
- OperatorFactory.getAndMakeChild(
- descClone, getSchema());
+ OperatorFactory.getAndMakeChild(cContext, descClone, getSchema());
return ret;
}
@@ -1355,7 +1339,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
@SuppressWarnings({ "serial", "unchecked", "rawtypes" })
private static class DummyOperator extends Operator {
- public DummyOperator() { super("dummy"); }
+ public DummyOperator() { super("dummy", null); }
@Override
public void process(Object row, int tag) {
@@ -1384,4 +1368,13 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
public String getReduceOutputName() {
return null;
}
+
+ public void setCompilationOpContext(CompilationOpContext ctx) {
+ cContext = ctx;
+ }
+
+ /** @return Compilation operator context. Only available during compilation. */
+ public CompilationOpContext getCompilationOpContext() {
+ return cContext;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
index f619a56..038b96c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
@@ -19,11 +19,13 @@
package org.apache.hadoop.hive.ql.exec;
import java.util.ArrayList;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
@@ -72,6 +74,8 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.UDTFDesc;
import org.apache.hadoop.hive.ql.plan.UnionDesc;
+import com.google.common.base.Preconditions;
+
/**
* OperatorFactory.
*
@@ -79,97 +83,68 @@ import org.apache.hadoop.hive.ql.plan.UnionDesc;
@SuppressWarnings({ "rawtypes", "unchecked" })
public final class OperatorFactory {
protected static transient final Logger LOG = LoggerFactory.getLogger(OperatorFactory.class);
- private static final List<OpTuple> opvec;
- private static final List<OpTuple> vectorOpvec;
+ private static final IdentityHashMap<Class<? extends OperatorDesc>,
+ Class<? extends Operator<? extends OperatorDesc>>> opvec = new IdentityHashMap<>();
+ private static final IdentityHashMap<Class<? extends OperatorDesc>,
+ Class<? extends Operator<? extends OperatorDesc>>> vectorOpvec = new IdentityHashMap<>();
static {
- opvec = new ArrayList<OpTuple>();
- opvec.add(new OpTuple<FilterDesc>(FilterDesc.class, FilterOperator.class));
- opvec.add(new OpTuple<SelectDesc>(SelectDesc.class, SelectOperator.class));
- opvec.add(new OpTuple<ForwardDesc>(ForwardDesc.class, ForwardOperator.class));
- opvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, FileSinkOperator.class));
- opvec.add(new OpTuple<CollectDesc>(CollectDesc.class, CollectOperator.class));
- opvec.add(new OpTuple<ScriptDesc>(ScriptDesc.class, ScriptOperator.class));
- opvec.add(new OpTuple<PTFDesc>(PTFDesc.class, PTFOperator.class));
- opvec.add(new OpTuple<ReduceSinkDesc>(ReduceSinkDesc.class, ReduceSinkOperator.class));
- opvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, GroupByOperator.class));
- opvec.add(new OpTuple<JoinDesc>(JoinDesc.class, JoinOperator.class));
- opvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, MapJoinOperator.class));
- opvec.add(new OpTuple<SMBJoinDesc>(SMBJoinDesc.class, SMBMapJoinOperator.class));
- opvec.add(new OpTuple<LimitDesc>(LimitDesc.class, LimitOperator.class));
- opvec.add(new OpTuple<TableScanDesc>(TableScanDesc.class, TableScanOperator.class));
- opvec.add(new OpTuple<UnionDesc>(UnionDesc.class, UnionOperator.class));
- opvec.add(new OpTuple<UDTFDesc>(UDTFDesc.class, UDTFOperator.class));
- opvec.add(new OpTuple<LateralViewJoinDesc>(LateralViewJoinDesc.class,
- LateralViewJoinOperator.class));
- opvec.add(new OpTuple<LateralViewForwardDesc>(LateralViewForwardDesc.class,
- LateralViewForwardOperator.class));
- opvec.add(new OpTuple<HashTableDummyDesc>(HashTableDummyDesc.class,
- HashTableDummyOperator.class));
- opvec.add(new OpTuple<HashTableSinkDesc>(HashTableSinkDesc.class,
- HashTableSinkOperator.class));
- opvec.add(new OpTuple<SparkHashTableSinkDesc>(SparkHashTableSinkDesc.class,
- SparkHashTableSinkOperator.class));
- opvec.add(new OpTuple<DummyStoreDesc>(DummyStoreDesc.class,
- DummyStoreOperator.class));
- opvec.add(new OpTuple<DemuxDesc>(DemuxDesc.class,
- DemuxOperator.class));
- opvec.add(new OpTuple<MuxDesc>(MuxDesc.class,
- MuxOperator.class));
- opvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class,
- AppMasterEventOperator.class));
- opvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
- AppMasterEventOperator.class));
- opvec.add(new OpTuple<SparkPartitionPruningSinkDesc>(SparkPartitionPruningSinkDesc.class,
- SparkPartitionPruningSinkOperator.class));
- opvec.add(new OpTuple<RCFileMergeDesc>(RCFileMergeDesc.class,
- RCFileMergeOperator.class));
- opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class,
- OrcFileMergeOperator.class));
- opvec.add(new OpTuple<CommonMergeJoinDesc>(CommonMergeJoinDesc.class,
- CommonMergeJoinOperator.class));
- opvec.add(new OpTuple<ListSinkDesc>(ListSinkDesc.class,
- ListSinkOperator.class));
+ opvec.put(FilterDesc.class, FilterOperator.class);
+ opvec.put(SelectDesc.class, SelectOperator.class);
+ opvec.put(ForwardDesc.class, ForwardOperator.class);
+ opvec.put(FileSinkDesc.class, FileSinkOperator.class);
+ opvec.put(CollectDesc.class, CollectOperator.class);
+ opvec.put(ScriptDesc.class, ScriptOperator.class);
+ opvec.put(PTFDesc.class, PTFOperator.class);
+ opvec.put(ReduceSinkDesc.class, ReduceSinkOperator.class);
+ opvec.put(GroupByDesc.class, GroupByOperator.class);
+ opvec.put(JoinDesc.class, JoinOperator.class);
+ opvec.put(MapJoinDesc.class, MapJoinOperator.class);
+ opvec.put(SMBJoinDesc.class, SMBMapJoinOperator.class);
+ opvec.put(LimitDesc.class, LimitOperator.class);
+ opvec.put(TableScanDesc.class, TableScanOperator.class);
+ opvec.put(UnionDesc.class, UnionOperator.class);
+ opvec.put(UDTFDesc.class, UDTFOperator.class);
+ opvec.put(LateralViewJoinDesc.class, LateralViewJoinOperator.class);
+ opvec.put(LateralViewForwardDesc.class, LateralViewForwardOperator.class);
+ opvec.put(HashTableDummyDesc.class, HashTableDummyOperator.class);
+ opvec.put(HashTableSinkDesc.class, HashTableSinkOperator.class);
+ opvec.put(SparkHashTableSinkDesc.class, SparkHashTableSinkOperator.class);
+ opvec.put(DummyStoreDesc.class, DummyStoreOperator.class);
+ opvec.put(DemuxDesc.class, DemuxOperator.class);
+ opvec.put(MuxDesc.class, MuxOperator.class);
+ opvec.put(AppMasterEventDesc.class, AppMasterEventOperator.class);
+ opvec.put(DynamicPruningEventDesc.class, AppMasterEventOperator.class);
+ opvec.put(SparkPartitionPruningSinkDesc.class, SparkPartitionPruningSinkOperator.class);
+ opvec.put(RCFileMergeDesc.class, RCFileMergeOperator.class);
+ opvec.put(OrcFileMergeDesc.class, OrcFileMergeOperator.class);
+ opvec.put(CommonMergeJoinDesc.class, CommonMergeJoinOperator.class);
+ opvec.put(ListSinkDesc.class, ListSinkOperator.class);
}
static {
- vectorOpvec = new ArrayList<OpTuple>();
- vectorOpvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class,
- VectorAppMasterEventOperator.class));
- vectorOpvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
- VectorAppMasterEventOperator.class));
- vectorOpvec.add(new OpTuple<SparkPartitionPruningSinkDesc>(
- SparkPartitionPruningSinkDesc.class,
- VectorSparkPartitionPruningSinkOperator.class));
- vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
- vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class));
- vectorOpvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, VectorMapJoinOperator.class));
- vectorOpvec.add(new OpTuple<SMBJoinDesc>(SMBJoinDesc.class, VectorSMBMapJoinOperator.class));
- vectorOpvec.add(new OpTuple<ReduceSinkDesc>(ReduceSinkDesc.class,
- VectorReduceSinkOperator.class));
- vectorOpvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, VectorFileSinkOperator.class));
- vectorOpvec.add(new OpTuple<FilterDesc>(FilterDesc.class, VectorFilterOperator.class));
- vectorOpvec.add(new OpTuple<LimitDesc>(LimitDesc.class, VectorLimitOperator.class));
- vectorOpvec.add(new OpTuple<SparkHashTableSinkDesc>(SparkHashTableSinkDesc.class,
- VectorSparkHashTableSinkOperator.class));
- }
-
- private static final class OpTuple<T extends OperatorDesc> {
- private final Class<T> descClass;
- private final Class<? extends Operator<?>> opClass;
-
- public OpTuple(Class<T> descClass, Class<? extends Operator<?>> opClass) {
- this.descClass = descClass;
- this.opClass = opClass;
- }
+ vectorOpvec.put(AppMasterEventDesc.class, VectorAppMasterEventOperator.class);
+ vectorOpvec.put(DynamicPruningEventDesc.class, VectorAppMasterEventOperator.class);
+ vectorOpvec.put(
+ SparkPartitionPruningSinkDesc.class, VectorSparkPartitionPruningSinkOperator.class);
+ vectorOpvec.put(SelectDesc.class, VectorSelectOperator.class);
+ vectorOpvec.put(GroupByDesc.class, VectorGroupByOperator.class);
+ vectorOpvec.put(MapJoinDesc.class, VectorMapJoinOperator.class);
+ vectorOpvec.put(SMBJoinDesc.class, VectorSMBMapJoinOperator.class);
+ vectorOpvec.put(ReduceSinkDesc.class, VectorReduceSinkOperator.class);
+ vectorOpvec.put(FileSinkDesc.class, VectorFileSinkOperator.class);
+ vectorOpvec.put(FilterDesc.class, VectorFilterOperator.class);
+ vectorOpvec.put(LimitDesc.class, VectorLimitOperator.class);
+ vectorOpvec.put(SparkHashTableSinkDesc.class, VectorSparkHashTableSinkOperator.class);
}
public static <T extends OperatorDesc> Operator<T> getVectorOperator(
- Class<? extends Operator<?>> opClass, T conf, VectorizationContext vContext) throws HiveException {
+ Class<? extends Operator<?>> opClass, CompilationOpContext cContext, T conf,
+ VectorizationContext vContext) throws HiveException {
try {
Operator<T> op = (Operator<T>) opClass.getDeclaredConstructor(
- VectorizationContext.class, OperatorDesc.class).newInstance(
- vContext, conf);
+ CompilationOpContext.class, VectorizationContext.class, OperatorDesc.class)
+ .newInstance(cContext, vContext, conf);
return op;
} catch (Exception e) {
e.printStackTrace();
@@ -177,50 +152,49 @@ public final class OperatorFactory {
}
}
- public static <T extends OperatorDesc> Operator<T> getVectorOperator(T conf,
- VectorizationContext vContext) throws HiveException {
+ public static <T extends OperatorDesc> Operator<T> getVectorOperator(
+ CompilationOpContext cContext, T conf, VectorizationContext vContext) throws HiveException {
Class<T> descClass = (Class<T>) conf.getClass();
- for (OpTuple o : vectorOpvec) {
- if (o.descClass == descClass) {
- return getVectorOperator(o.opClass, conf, vContext);
- }
+ Class<?> opClass = vectorOpvec.get(descClass);
+ if (opClass != null) {
+ return getVectorOperator(vectorOpvec.get(descClass), cContext, conf, vContext);
}
- throw new HiveException("No vector operator for descriptor class "
- + descClass.getName());
+ throw new HiveException("No vector operator for descriptor class " + descClass.getName());
}
- public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) {
-
- for (OpTuple o : opvec) {
- if (o.descClass == opClass) {
- try {
- Operator<T> op = (Operator<T>) o.opClass.newInstance();
- return op;
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
+ public static <T extends OperatorDesc> Operator<T> get(
+ CompilationOpContext cContext, Class<T> descClass) {
+ Preconditions.checkNotNull(cContext);
+ Class<?> opClass = opvec.get(descClass);
+ if (opClass != null) {
+ try {
+ return (Operator<T>)opClass.getDeclaredConstructor(
+ CompilationOpContext.class).newInstance(cContext);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
}
}
- throw new RuntimeException("No operator for descriptor class "
- + opClass.getName());
+ throw new RuntimeException("No operator for descriptor class " + descClass.getName());
}
- public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass,
- RowSchema rwsch) {
-
- Operator<T> ret = get(opClass);
- ret.setSchema(rwsch);
- return ret;
+ /**
+ * Returns an operator given the conf and a list of children operators.
+ */
+ public static <T extends OperatorDesc> Operator<T> get(CompilationOpContext cContext, T conf) {
+ Operator<T> ret = get(cContext, (Class<T>) conf.getClass());
+ ret.setConf(conf);
+ return (ret);
}
/**
* Returns an operator given the conf and a list of children operators.
*/
public static <T extends OperatorDesc> Operator<T> get(T conf,
- Operator<? extends OperatorDesc>... oplist) {
- Operator<T> ret = get((Class<T>) conf.getClass());
+ Operator<? extends OperatorDesc> oplist0, Operator<? extends OperatorDesc>... oplist) {
+ Operator<T> ret = get(oplist0.getCompilationOpContext(), (Class<T>) conf.getClass());
ret.setConf(conf);
+ makeChild(ret, oplist0);
makeChild(ret, oplist);
return (ret);
}
@@ -253,27 +227,28 @@ public final class OperatorFactory {
/**
* Returns an operator given the conf and a list of children operators.
*/
- public static <T extends OperatorDesc> Operator<T> get(T conf,
- RowSchema rwsch, Operator... oplist) {
- Operator<T> ret = get(conf, oplist);
+ public static <T extends OperatorDesc> Operator<T> get(
+ CompilationOpContext cContext, T conf, RowSchema rwsch) {
+ Operator<T> ret = get(cContext, conf);
ret.setSchema(rwsch);
return (ret);
}
+
/**
* Returns an operator given the conf and a list of parent operators.
*/
- public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
- Operator... oplist) {
- Operator<T> ret = get((Class<T>) conf.getClass());
+ public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
+ T conf, Operator oplist0, Operator... oplist) {
+ Operator<T> ret = get(oplist0.getCompilationOpContext(), (Class<T>) conf.getClass());
ret.setConf(conf);
- if (oplist.length == 0) {
- return (ret);
- }
// Add the new operator as child of each of the passed in operators
+ List<Operator> children = oplist0.getChildOperators();
+ children.add(ret);
+ oplist0.setChildOperators(children);
for (Operator op : oplist) {
- List<Operator> children = op.getChildOperators();
+ children = op.getChildOperators();
children.add(ret);
op.setChildOperators(children);
}
@@ -281,6 +256,7 @@ public final class OperatorFactory {
// add parents for the newly created operator
List<Operator<? extends OperatorDesc>> parent =
new ArrayList<Operator<? extends OperatorDesc>>();
+ parent.add(oplist0);
for (Operator op : oplist) {
parent.add(op);
}
@@ -293,9 +269,9 @@ public final class OperatorFactory {
/**
* Returns an operator given the conf and a list of parent operators.
*/
- public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
- List<Operator<? extends OperatorDesc>> oplist) {
- Operator<T> ret = get((Class<T>) conf.getClass());
+ public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext,
+ T conf, List<Operator<? extends OperatorDesc>> oplist) {
+ Operator<T> ret = get(cContext, (Class<T>) conf.getClass());
ret.setConf(conf);
if (oplist.size() == 0) {
return ret;
@@ -322,9 +298,49 @@ public final class OperatorFactory {
/**
* Returns an operator given the conf and a list of parent operators.
*/
- public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
- RowSchema rwsch, Operator... oplist) {
- Operator<T> ret = getAndMakeChild(conf, oplist);
+ public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
+ CompilationOpContext cContext, T conf, RowSchema rwsch) {
+ Operator<T> ret = get(cContext, (Class<T>) conf.getClass());
+ ret.setConf(conf);
+ ret.setSchema(rwsch);
+ return ret;
+ }
+
+ /**
+ * Returns an operator given the conf and a list of parent operators.
+ */
+ public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
+ CompilationOpContext ctx, T conf, RowSchema rwsch, Operator[] oplist) {
+ Operator<T> ret = get(ctx, (Class<T>) conf.getClass());
+ ret.setConf(conf);
+ ret.setSchema(rwsch);
+ if (oplist.length == 0) return ret;
+
+ // Add the new operator as child of each of the passed in operators
+ for (Operator op : oplist) {
+ List<Operator> children = op.getChildOperators();
+ children.add(ret);
+ op.setChildOperators(children);
+ }
+
+ // add parents for the newly created operator
+ List<Operator<? extends OperatorDesc>> parent =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ for (Operator op : oplist) {
+ parent.add(op);
+ }
+
+ ret.setParentOperators(parent);
+
+ return (ret);
+ }
+
+ /**
+ * Returns an operator given the conf and a list of parent operators.
+ */
+ public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
+ T conf, RowSchema rwsch, Operator oplist0, Operator... oplist) {
+ Operator<T> ret = getAndMakeChild(conf, oplist0, oplist);
ret.setSchema(rwsch);
return ret;
}
@@ -332,9 +348,9 @@ public final class OperatorFactory {
/**
* Returns an operator given the conf and a list of parent operators.
*/
- public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
- RowSchema rwsch, Map<String, ExprNodeDesc> colExprMap, Operator... oplist) {
- Operator<T> ret = getAndMakeChild(conf, rwsch, oplist);
+ public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf, RowSchema rwsch,
+ Map<String, ExprNodeDesc> colExprMap, Operator oplist0, Operator... oplist) {
+ Operator<T> ret = getAndMakeChild(conf, rwsch, oplist0, oplist);
ret.setColumnExprMap(colExprMap);
return (ret);
}
@@ -342,9 +358,9 @@ public final class OperatorFactory {
/**
* Returns an operator given the conf and a list of parent operators.
*/
- public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
- RowSchema rwsch, List<Operator<? extends OperatorDesc>> oplist) {
- Operator<T> ret = getAndMakeChild(conf, oplist);
+ public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext,
+ T conf, RowSchema rwsch, List<Operator<? extends OperatorDesc>> oplist) {
+ Operator<T> ret = getAndMakeChild(cContext, conf, oplist);
ret.setSchema(rwsch);
return (ret);
}
@@ -352,9 +368,10 @@ public final class OperatorFactory {
/**
* Returns an operator given the conf and a list of parent operators.
*/
- public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
- RowSchema rwsch, Map<String, ExprNodeDesc> colExprMap, List<Operator<? extends OperatorDesc>> oplist) {
- Operator<T> ret = getAndMakeChild(conf, rwsch, oplist);
+ public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext,
+ T conf, RowSchema rwsch, Map<String, ExprNodeDesc> colExprMap,
+ List<Operator<? extends OperatorDesc>> oplist) {
+ Operator<T> ret = getAndMakeChild(cContext, conf, rwsch, oplist);
ret.setColumnExprMap(colExprMap);
return (ret);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
index 2c9deac..445cf3d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.IOException;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.io.orc.Writer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +57,15 @@ public class OrcFileMergeOperator extends
private Reader reader;
private FSDataInputStream fdis;
+ /** Kryo ctor. */
+ protected OrcFileMergeOperator() {
+ super();
+ }
+
+ public OrcFileMergeOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
public void process(Object row, int tag) throws HiveException {
Object[] keyValue = (Object[]) row;
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
index 113ac21..2e9e539 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
@@ -26,6 +26,7 @@ import java.util.Stack;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -60,6 +61,15 @@ public class PTFOperator extends Operator<PTFDesc> implements Serializable {
transient Configuration hiveConf;
transient PTFInvocation ptfInvocation;
+ /** Kryo ctor. */
+ protected PTFOperator() {
+ super();
+ }
+
+ public PTFOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
/*
* 1. Find out if the operator is invoked at Map-Side or Reduce-side
* 2. Get the deserialized QueryDef
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
index c34454c..4dea1d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileKeyBufferWrapper;
@@ -36,12 +37,22 @@ import java.io.IOException;
*/
public class RCFileMergeOperator
extends AbstractFileMergeOperator<RCFileMergeDesc> {
+
public final static Logger LOG = LoggerFactory.getLogger("RCFileMergeMapper");
RCFile.Writer outWriter;
CompressionCodec codec = null;
int columnNumber = 0;
+ /** Kryo ctor. */
+ protected RCFileMergeOperator() {
+ super();
+ }
+
+ public RCFileMergeOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
public void process(Object row, int tag) throws HiveException {
Object[] keyValue = (Object[]) row;
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 4b65952..74b4802 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -151,6 +152,15 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
protected transient long logEveryNRows = 0;
private final transient LongWritable recordCounter = new LongWritable();
+ /** Kryo ctor. */
+ protected ReduceSinkOperator() {
+ super();
+ }
+
+ public ReduceSinkOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);