You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/01/19 03:47:38 UTC

[4/4] hive git commit: HIVE-12758 : Parallel compilation: Operator::resetId() is not thread-safe (Sergey Shelukhin, reviewed by Gopal V)

HIVE-12758 : Parallel compilation: Operator::resetId() is not thread-safe (Sergey Shelukhin, reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/88fceaca
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/88fceaca
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/88fceaca

Branch: refs/heads/master
Commit: 88fceacaa1f4d9d2d4e56850006d4d1ddfdbf102
Parents: 4eab2df
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Jan 18 18:47:17 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Jan 18 18:47:17 2016 -0800

----------------------------------------------------------------------
 .../mapreduce/TestHCatMultiOutputFormat.java    |   4 +-
 .../hadoop/hive/ql/CompilationOpContext.java    |  36 +++
 .../java/org/apache/hadoop/hive/ql/Context.java |   5 +
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  10 +-
 .../hive/ql/exec/AbstractFileMergeOperator.java |  10 +
 .../hive/ql/exec/AbstractMapJoinOperator.java   |   9 +-
 .../hive/ql/exec/AppMasterEventOperator.java    |  10 +
 .../hadoop/hive/ql/exec/CollectOperator.java    |  10 +
 .../hadoop/hive/ql/exec/ColumnStatsTask.java    |   8 +-
 .../hive/ql/exec/ColumnStatsUpdateTask.java     |   6 +-
 .../hadoop/hive/ql/exec/CommonJoinOperator.java |  12 +-
 .../hive/ql/exec/CommonMergeJoinOperator.java   |   8 +-
 .../hadoop/hive/ql/exec/ConditionalTask.java    |   5 -
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |  19 +-
 .../hadoop/hive/ql/exec/DemuxOperator.java      |  10 +
 .../hadoop/hive/ql/exec/DummyStoreOperator.java |   8 +-
 .../apache/hadoop/hive/ql/exec/FetchTask.java   |   8 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |  10 +
 .../hadoop/hive/ql/exec/FilterOperator.java     |   8 +-
 .../hadoop/hive/ql/exec/ForwardOperator.java    |  10 +
 .../hadoop/hive/ql/exec/FunctionTask.java       |   6 +-
 .../hadoop/hive/ql/exec/GroupByOperator.java    |  10 +
 .../hive/ql/exec/HashTableDummyOperator.java    |  10 +
 .../hive/ql/exec/HashTableSinkOperator.java     |  12 +-
 .../hadoop/hive/ql/exec/JoinOperator.java       |  10 +
 .../ql/exec/LateralViewForwardOperator.java     |  10 +
 .../hive/ql/exec/LateralViewJoinOperator.java   |  10 +
 .../hadoop/hive/ql/exec/LimitOperator.java      |  10 +
 .../hadoop/hive/ql/exec/ListSinkOperator.java   |  10 +
 .../hadoop/hive/ql/exec/MapJoinOperator.java    |   9 +-
 .../apache/hadoop/hive/ql/exec/MapOperator.java |  10 +
 .../apache/hadoop/hive/ql/exec/MuxOperator.java |  10 +
 .../apache/hadoop/hive/ql/exec/Operator.java    |  57 ++--
 .../hadoop/hive/ql/exec/OperatorFactory.java    | 287 ++++++++++---------
 .../hive/ql/exec/OrcFileMergeOperator.java      |  10 +
 .../apache/hadoop/hive/ql/exec/PTFOperator.java |  10 +
 .../hive/ql/exec/RCFileMergeOperator.java       |  11 +
 .../hadoop/hive/ql/exec/ReduceSinkOperator.java |  10 +
 .../hadoop/hive/ql/exec/SMBMapJoinOperator.java |   9 +-
 .../hadoop/hive/ql/exec/ScriptOperator.java     |  10 +
 .../hadoop/hive/ql/exec/SelectOperator.java     |  10 +
 .../hive/ql/exec/SerializationUtilities.java    |  51 +++-
 .../ql/exec/SparkHashTableSinkOperator.java     |  12 +-
 .../hadoop/hive/ql/exec/StatsNoJobTask.java     |   6 +-
 .../hadoop/hive/ql/exec/TableScanOperator.java  |  10 +
 .../org/apache/hadoop/hive/ql/exec/Task.java    |   4 +-
 .../hive/ql/exec/TemporaryHashSinkOperator.java |   4 +-
 .../hadoop/hive/ql/exec/TerminalOperator.java   |   9 +
 .../hive/ql/exec/TezDummyStoreOperator.java     |  10 +
 .../hadoop/hive/ql/exec/UDTFOperator.java       |  10 +
 .../hadoop/hive/ql/exec/UnionOperator.java      |  12 +-
 .../hadoop/hive/ql/exec/mr/ExecDriver.java      |   6 +-
 .../hadoop/hive/ql/exec/mr/ExecMapper.java      |   6 +-
 .../hadoop/hive/ql/exec/mr/HashTableLoader.java |   3 +-
 .../hadoop/hive/ql/exec/mr/MapredLocalTask.java |   6 +-
 .../hive/ql/exec/spark/HashTableLoader.java     |   3 +-
 .../ql/exec/spark/SparkMapRecordHandler.java    |   6 +-
 .../hadoop/hive/ql/exec/spark/SparkTask.java    |   6 +-
 .../hive/ql/exec/tez/MapRecordProcessor.java    |  10 +-
 .../vector/VectorAppMasterEventOperator.java    |  15 +-
 .../ql/exec/vector/VectorFileSinkOperator.java  |  14 +-
 .../ql/exec/vector/VectorFilterOperator.java    |  14 +-
 .../ql/exec/vector/VectorGroupByOperator.java   |  15 +-
 .../ql/exec/vector/VectorLimitOperator.java     |  12 +-
 .../exec/vector/VectorMapJoinBaseOperator.java  |  14 +-
 .../ql/exec/vector/VectorMapJoinOperator.java   |  14 +-
 .../VectorMapJoinOuterFilteredOperator.java     |  14 +-
 .../hive/ql/exec/vector/VectorMapOperator.java  |  10 +
 .../exec/vector/VectorReduceSinkOperator.java   |  14 +-
 .../exec/vector/VectorSMBMapJoinOperator.java   |  14 +-
 .../ql/exec/vector/VectorSelectOperator.java    |  14 +-
 .../VectorSparkHashTableSinkOperator.java       |  14 +-
 ...VectorSparkPartitionPruningSinkOperator.java |  15 +-
 .../mapjoin/VectorMapJoinCommonOperator.java    |  14 +-
 .../VectorMapJoinGenerateResultOperator.java    |  14 +-
 ...pJoinInnerBigOnlyGenerateResultOperator.java |  14 +-
 .../VectorMapJoinInnerBigOnlyLongOperator.java  |  13 +-
 ...ctorMapJoinInnerBigOnlyMultiKeyOperator.java |  13 +-
 ...VectorMapJoinInnerBigOnlyStringOperator.java |  13 +-
 ...ectorMapJoinInnerGenerateResultOperator.java |  14 +-
 .../mapjoin/VectorMapJoinInnerLongOperator.java |  13 +-
 .../VectorMapJoinInnerMultiKeyOperator.java     |  13 +-
 .../VectorMapJoinInnerStringOperator.java       |  13 +-
 ...orMapJoinLeftSemiGenerateResultOperator.java |  14 +-
 .../VectorMapJoinLeftSemiLongOperator.java      |  13 +-
 .../VectorMapJoinLeftSemiMultiKeyOperator.java  |  13 +-
 .../VectorMapJoinLeftSemiStringOperator.java    |  13 +-
 ...ectorMapJoinOuterGenerateResultOperator.java |  14 +-
 .../mapjoin/VectorMapJoinOuterLongOperator.java |  13 +-
 .../VectorMapJoinOuterMultiKeyOperator.java     |  13 +-
 .../VectorMapJoinOuterStringOperator.java       |  13 +-
 .../VectorReduceSinkCommonOperator.java         |  14 +-
 .../VectorReduceSinkLongOperator.java           |  14 +-
 .../VectorReduceSinkMultiKeyOperator.java       |  14 +-
 .../VectorReduceSinkStringOperator.java         |  14 +-
 .../apache/hadoop/hive/ql/hooks/ATSHook.java    |   2 +-
 .../hadoop/hive/ql/io/merge/MergeFileTask.java  |   5 +-
 .../ql/io/rcfile/stats/PartialScanTask.java     |   7 +-
 .../io/rcfile/truncate/ColumnTruncateTask.java  |   5 +-
 .../hive/ql/optimizer/AbstractSMBJoinProc.java  |   2 +-
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |   8 +-
 .../DynamicPartitionPruningOptimization.java    |   4 +-
 .../hive/ql/optimizer/GenMRTableScan1.java      |   2 +-
 .../hive/ql/optimizer/GenMapRedUtils.java       |  23 +-
 .../hive/ql/optimizer/GroupByOptimizer.java     |   4 +-
 .../hive/ql/optimizer/MapJoinProcessor.java     |  12 +-
 .../ql/optimizer/ReduceSinkMapJoinProc.java     |   4 +-
 .../ql/optimizer/SimpleFetchAggregation.java    |   3 +-
 .../hive/ql/optimizer/SimpleFetchOptimizer.java |   3 +-
 .../hive/ql/optimizer/SkewJoinOptimizer.java    |   8 +-
 .../calcite/translator/HiveGBOpConvUtil.java    |  10 +-
 .../calcite/translator/HiveOpConverter.java     |  34 +--
 .../QueryPlanTreeTransformation.java            |  12 +-
 .../physical/CommonJoinTaskDispatcher.java      |   4 +-
 .../physical/GenMRSkewJoinProcessor.java        |   7 +-
 .../physical/GenSparkSkewJoinProcessor.java     |  13 +-
 .../physical/LocalMapJoinProcFactory.java       |   5 +-
 .../hive/ql/optimizer/physical/Vectorizer.java  |  15 +-
 .../spark/SparkReduceSinkMapJoinProc.java       |   7 +-
 .../optimizer/unionproc/UnionProcFactory.java   |   3 +-
 .../hive/ql/parse/BaseSemanticAnalyzer.java     |   2 +
 .../hive/ql/parse/ExplainSemanticAnalyzer.java  |   2 +-
 .../apache/hadoop/hive/ql/parse/GenTezWork.java |   5 +-
 .../hive/ql/parse/ProcessAnalyzeTable.java      |   3 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  31 +-
 .../hive/ql/parse/spark/GenSparkWork.java       |   3 +-
 .../SparkPartitionPruningSinkOperator.java      |  10 +
 .../parse/spark/SparkProcessAnalyzeTable.java   |   3 +-
 .../apache/hadoop/hive/ql/plan/BaseWork.java    |   1 +
 .../hadoop/hive/ql/plan/ColumnStatsWork.java    |   5 +-
 .../apache/hadoop/hive/ql/plan/FetchWork.java   |   5 +-
 .../org/apache/hadoop/hive/ql/plan/MapWork.java |   5 +
 .../apache/hadoop/hive/ql/plan/MapredWork.java  |   7 +-
 .../hadoop/hive/ql/plan/MergeJoinWork.java      |   5 +
 .../apache/hadoop/hive/ql/plan/ReduceWork.java  |   6 +
 .../apache/hadoop/hive/ql/plan/UnionWork.java   |   5 +
 .../hadoop/hive/ql/ppd/OpProcFactory.java       |   3 +-
 .../ql/ppd/PredicateTransitivePropagate.java    |   4 +-
 .../hive/ql/ppd/SyntheticJoinPredicate.java     |   4 +-
 .../hadoop/hive/ql/exec/TestExecDriver.java     |  38 +--
 .../hive/ql/exec/TestFileSinkOperator.java      |   4 +-
 .../hadoop/hive/ql/exec/TestOperators.java      |  17 +-
 .../apache/hadoop/hive/ql/exec/TestPlan.java    |   3 +-
 .../exec/vector/TestVectorFilterOperator.java   |   3 +-
 .../exec/vector/TestVectorGroupByOperator.java  |  58 ++--
 .../ql/exec/vector/TestVectorLimitOperator.java |   3 +-
 .../exec/vector/TestVectorSelectOperator.java   |  10 +-
 .../vector/util/FakeCaptureOutputOperator.java  |  14 +-
 .../util/FakeVectorDataSourceOperator.java      |  16 +-
 .../ql/optimizer/physical/TestVectorizer.java   |   7 +-
 .../hadoop/hive/ql/parse/TestGenTezWork.java    |   8 +-
 .../parse/TestUpdateDeleteSemanticAnalyzer.java |   2 +-
 .../hive/ql/testutil/BaseScalarUdfTest.java     |   3 +-
 .../results/clientpositive/auto_join0.q.out     |   8 +-
 .../cbo_rp_cross_product_check_2.q.out          |   4 +-
 .../clientpositive/cross_product_check_2.q.out  |   4 +-
 .../subquery_multiinsert.q.java1.7.out          |   4 +-
 .../subquery_multiinsert.q.java1.8.out          |  60 ++--
 158 files changed, 1379 insertions(+), 598 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
index 8148faa..61efc1a 100644
--- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
+++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -40,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -392,7 +392,7 @@ public class TestHCatMultiOutputFormat {
     }
     FetchTask task = new FetchTask();
     task.setWork(work);
-    task.initialize(conf, null, null);
+    task.initialize(conf, null, null, new CompilationOpContext());
     task.fetch(temp);
     for (String str : temp) {
       results.add(str.replace("\t", ","));

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java b/ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java
new file mode 100644
index 0000000..949f873
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A subset of compilation context that is passed to operators to get rid of some globals.
+ * Perhaps this should be rolled into main Context; however, some code necessitates storing the
+ * context in the operators for now, so this may not be advisable given how much stuff the main
+ * Context class contains.
+ * For now, only the operator sequence ID lives here.
+ */
+public class CompilationOpContext {
+  private final AtomicInteger opSeqId = new AtomicInteger(0);
+
+  public int nextOperatorId() {
+    return opSeqId.getAndIncrement();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index affaec8..746456b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -67,6 +67,7 @@ public class Context {
   private int resDirFilesNum;
   boolean initialized;
   String originalTracker = null;
+  private final CompilationOpContext opContext;
   private final Map<String, ContentSummary> pathToCS = new ConcurrentHashMap<String, ContentSummary>();
 
   // scratch path to use for all non-local (ie. hdfs) file system tmp folders
@@ -133,6 +134,7 @@ public class Context {
     localScratchDir = new Path(SessionState.getLocalSessionPath(conf), executionId).toUri().getPath();
     scratchDirPermission = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION);
     stagingDir = HiveConf.getVar(conf, HiveConf.ConfVars.STAGINGDIR);
+    opContext = new CompilationOpContext();
   }
 
 
@@ -715,4 +717,7 @@ public class Context {
     this.cboSucceeded = cboSucceeded;
   }
 
+  public CompilationOpContext getOpContext() {
+    return opContext;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 12a7eea..75187cf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -169,7 +169,7 @@ public class Driver implements CommandProcessor {
 
   @Override
   public void init() {
-    Operator.resetId();
+    // Nothing for now.
   }
 
   /**
@@ -494,7 +494,7 @@ public class Driver implements CommandProcessor {
 
       // initialize FetchTask right here
       if (plan.getFetchTask() != null) {
-        plan.getFetchTask().initialize(conf, plan, null);
+        plan.getFetchTask().initialize(conf, plan, null, ctx.getOpContext());
       }
 
       //do the authorization check
@@ -572,7 +572,7 @@ public class Driver implements CommandProcessor {
       ASTNode astTree) throws IOException {
     String ret = null;
     ExplainTask task = new ExplainTask();
-    task.initialize(conf, plan, null);
+    task.initialize(conf, plan, null, ctx.getOpContext());
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     PrintStream ps = new PrintStream(baos);
     try {
@@ -1775,7 +1775,7 @@ public class Driver implements CommandProcessor {
       cxt.incCurJobNo(1);
       console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
     }
-    tsk.initialize(conf, plan, cxt);
+    tsk.initialize(conf, plan, cxt, ctx.getOpContext());
     TaskResult tskRes = new TaskResult();
     TaskRunner tskRun = new TaskRunner(tsk, tskRes);
 
@@ -1865,7 +1865,7 @@ public class Driver implements CommandProcessor {
         throw new IOException("Error closing the current fetch task", e);
       }
       // FetchTask should not depend on the plan.
-      fetchTask.initialize(conf, null, null);
+      fetchTask.initialize(conf, null, null, ctx.getOpContext());
     } else {
       ctx.resetStream();
       resStream = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index a3ec0e1..f99bf11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
@@ -61,6 +62,15 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
   protected Set<Path> incompatFileSet;
   protected transient DynamicPartitionCtx dpCtx;
 
+  /** Kryo ctor. */
+  protected AbstractFileMergeOperator() {
+    super();
+  }
+
+  public AbstractFileMergeOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   public void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
index 7302688..69ba4a2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -50,7 +51,13 @@ public abstract class AbstractMapJoinOperator <T extends MapJoinDesc> extends Co
 
   transient int numMapRowsRead;
 
-  public AbstractMapJoinOperator() {
+  /** Kryo ctor. */
+  protected AbstractMapJoinOperator() {
+    super();
+  }
+
+  public AbstractMapJoinOperator(CompilationOpContext ctx) {
+    super(ctx);
   }
 
   public AbstractMapJoinOperator(AbstractMapJoinOperator<? extends MapJoinDesc> mjop) {

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
index 7114177..743098b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Future;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.tez.TezContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
@@ -51,6 +52,15 @@ public class AppMasterEventOperator extends Operator<AppMasterEventDesc> {
   protected transient boolean hasReachedMaxSize = false;
   protected transient long MAX_SIZE;
 
+  /** Kryo ctor. */
+  protected AppMasterEventOperator() {
+    super();
+  }
+
+  public AppMasterEventOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   public void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
index e2f4f58..27ddf13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -42,6 +43,15 @@ public class CollectOperator extends Operator<CollectDesc> implements
   protected transient ObjectInspector standardRowInspector;
   transient int maxSize;
 
+  /** Kryo ctor. */
+  protected CollectOperator() {
+    super();
+  }
+
+  public CollectOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
index f6fbe74..7914471 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -81,9 +82,10 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
   }
 
   @Override
-  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
-    super.initialize(conf, queryPlan, ctx);
-    work.initializeForFetch();
+  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx,
+      CompilationOpContext opContext) {
+    super.initialize(conf, queryPlan, ctx, opContext);
+    work.initializeForFetch(opContext);
     try {
       JobConf job = new JobConf(conf);
       ftOp = new FetchOperator(work.getfWork(), job);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
index dcbbe2e..a1b98f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -68,8 +69,9 @@ public class ColumnStatsUpdateTask extends Task<ColumnStatsUpdateWork> {
       .getLogger(ColumnStatsUpdateTask.class);
 
   @Override
-  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
-    super.initialize(conf, queryPlan, ctx);
+  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx,
+      CompilationOpContext opContext) {
+    super.initialize(conf, queryPlan, ctx, opContext);
   }
 
   private ColumnStatistics constructColumnStatsFromInput()

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
index b0170f5..f8520f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.persistence.AbstractRowContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -125,17 +126,23 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends
   protected transient int heartbeatInterval;
   protected static final int NOTSKIPBIGTABLE = -1;
 
-  public CommonJoinOperator() {
+  /** Kryo ctor. */
+  protected CommonJoinOperator() {
+    super();
+  }
+
+  public CommonJoinOperator(CompilationOpContext ctx) {
+    super(ctx);
   }
 
   public CommonJoinOperator(CommonJoinOperator<T> clone) {
+    super(clone.id, clone.cContext);
     this.joinEmitInterval = clone.joinEmitInterval;
     this.joinCacheSize = clone.joinCacheSize;
     this.nextSz = clone.nextSz;
     this.childOperators = clone.childOperators;
     this.parentOperators = clone.parentOperators;
     this.done = clone.done;
-    this.operatorId = clone.operatorId;
     this.storage = clone.storage;
     this.condn = clone.condn;
     this.conf = clone.getConf();
@@ -150,7 +157,6 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends
     this.groupKeyObject = clone.groupKeyObject;
     this.handleSkewJoin = clone.handleSkewJoin;
     this.hconf = clone.hconf;
-    this.id = clone.id;
     this.inputObjInspectors = clone.inputObjInspectors;
     this.noOuterJoin = clone.noOuterJoin;
     this.numAliases = clone.numAliases;

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
index 1cbd13d..8693200 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
 import org.apache.hadoop.hive.ql.exec.tez.RecordSource;
 import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource;
@@ -90,10 +91,15 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
       new ArrayList<Operator<? extends OperatorDesc>>();
   transient Set<Integer> fetchInputAtClose;
 
-  public CommonMergeJoinOperator() {
+  /** Kryo ctor. */
+  protected CommonMergeJoinOperator() {
     super();
   }
 
+  public CommonMergeJoinOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void initializeOp(Configuration hconf) throws HiveException {

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
index 031331e..c96c813 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
@@ -72,11 +72,6 @@ public class ConditionalTask extends Task<ConditionalWork> implements Serializab
   }
 
   @Override
-  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
-    super.initialize(conf, queryPlan, driverContext);
-  }
-
-  @Override
   public int execute(DriverContext driverContext) {
     resTasks = resolver.getTasks(conf, resolverCtx);
     resolved = true;

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 5f3a9cf..94bb73c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -260,8 +261,9 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
   }
 
   @Override
-  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
-    super.initialize(conf, queryPlan, ctx);
+  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx,
+      CompilationOpContext opContext) {
+    super.initialize(conf, queryPlan, ctx, opContext);
 
     // Pick the formatter to use to display the results.  Either the
     // normal human readable output or a json object.
@@ -507,7 +509,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
 
       AlterTablePartMergeFilesDesc mergeFilesDesc = work.getMergeFilesDesc();
       if (mergeFilesDesc != null) {
-        return mergeFiles(db, mergeFilesDesc);
+        return mergeFiles(db, mergeFilesDesc, driverContext);
       }
 
       AlterTableAlterPartDesc alterPartDesc = work.getAlterTableAlterPartDesc();
@@ -609,8 +611,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
    * @return
    * @throws HiveException
    */
-  private int mergeFiles(Hive db, AlterTablePartMergeFilesDesc mergeFilesDesc)
-      throws HiveException {
+  private int mergeFiles(Hive db, AlterTablePartMergeFilesDesc mergeFilesDesc,
+      DriverContext driverContext) throws HiveException {
     ListBucketingCtx lbCtx = mergeFilesDesc.getLbCtx();
     boolean lbatc = lbCtx == null ? false : lbCtx.isSkewedStoredAsDir();
     int lbd = lbCtx == null ? 0 : lbCtx.calculateListBucketingLevel();
@@ -642,7 +644,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     fmd.setListBucketingDepth(lbd);
     fmd.setOutputPath(mergeFilesDesc.getOutputDir());
 
-    Operator<? extends OperatorDesc> mergeOp = OperatorFactory.get(fmd);
+    CompilationOpContext opContext = driverContext.getCtx().getOpContext();
+    Operator<? extends OperatorDesc> mergeOp = OperatorFactory.get(opContext, fmd);
 
     LinkedHashMap<String, Operator<? extends  OperatorDesc>> aliasToWork =
         new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
@@ -662,7 +665,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     }
 
     // initialize the task and execute
-    task.initialize(db.getConf(), getQueryPlan(), driverCxt);
+    task.initialize(db.getConf(), getQueryPlan(), driverCxt, opContext);
     int ret = task.execute(driverCxt);
     return ret;
   }
@@ -4270,7 +4273,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       truncateWork.setMapperCannotSpanPartns(true);
       DriverContext driverCxt = new DriverContext();
       ColumnTruncateTask taskExec = new ColumnTruncateTask();
-      taskExec.initialize(db.getConf(), null, driverCxt);
+      taskExec.initialize(db.getConf(), null, driverCxt, null);
       taskExec.setWork(truncateWork);
       taskExec.setQueryPlan(this.getQueryPlan());
       return taskExec.execute(driverCxt);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java
index 0888c7b..b897c16 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.DemuxDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -109,6 +110,15 @@ public class DemuxOperator extends Operator<DemuxDesc>
   // its children's parents lists, also see childOperatorsTag in Operator) at here.
   private int[][] newChildOperatorsTag;
 
+  /** Kryo ctor. */
+  protected DemuxOperator() {
+    super();
+  }
+
+  public DemuxOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
index 0c12570..06a3884 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -69,10 +70,15 @@ public class DummyStoreOperator extends Operator<DummyStoreDesc> implements Seri
 
   protected transient InspectableObject result;
 
-  public DummyStoreOperator() {
+  /** Kryo ctor. */
+  protected DummyStoreOperator() {
     super();
   }
 
+  public DummyStoreOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index 1634143..4415328 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
@@ -59,9 +60,10 @@ public class FetchTask extends Task<FetchWork> implements Serializable {
   }
 
   @Override
-  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
-    super.initialize(conf, queryPlan, ctx);
-    work.initializeForFetch();
+  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx,
+      CompilationOpContext opContext) {
+    super.initialize(conf, queryPlan, ctx, opContext);
+    work.initializeForFetch(opContext);
 
     try {
       // Create a file system handle

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 32bfcf5..2fa3d96 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -322,6 +323,15 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     childSpecPathDynLinkedPartitions = conf.getDirName().getName();
   }
 
+  /** Kryo ctor. */
+  protected FileSinkOperator() {
+    super();
+  }
+
+  public FileSinkOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
index 0e7e79d..08f2633 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -45,8 +46,13 @@ public class FilterOperator extends Operator<FilterDesc> implements
   private transient IOContext ioContext;
   protected transient int heartbeatInterval;
 
-  public FilterOperator() {
+  /** Kryo ctor. */
+  protected FilterOperator() {
     super();
+  }
+
+  public FilterOperator(CompilationOpContext ctx) {
+    super(ctx);
     consecutiveSearches = 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
index 7a4c58a..2df7cca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ForwardDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -61,6 +62,15 @@ public class ForwardOperator extends Operator<ForwardDesc> implements
     return "FOR";
   }
 
+  /** Kryo ctor. */
+  protected ForwardOperator() {
+    super();
+  }
+
+  public ForwardOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
index ec755a8..ed6f062 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.ResourceType;
 import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -60,8 +61,9 @@ public class FunctionTask extends Task<FunctionWork> {
   }
 
   @Override
-  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
-    super.initialize(conf, queryPlan, ctx);
+  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx,
+      CompilationOpContext opContext) {
+    super.initialize(conf, queryPlan, ctx, opContext);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 1693ec3..0839b42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -37,6 +37,7 @@ import javolution.util.FastBitSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
@@ -179,6 +180,15 @@ public class GroupByOperator extends Operator<GroupByDesc> {
     return bits;
   }
 
+  /** Kryo ctor. */
+  protected GroupByOperator() {
+    super();
+  }
+
+  public GroupByOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
index 1de8c76..4749247 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -32,6 +33,15 @@ import org.apache.hadoop.hive.serde2.SerDeUtils;
 public class HashTableDummyOperator extends Operator<HashTableDummyDesc> implements Serializable {
   private static final long serialVersionUID = 1L;
 
+  /** Kryo ctor. */
+  protected HashTableDummyOperator() {
+    super();
+  }
+
+  public HashTableDummyOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
index 76308f6..deb7c76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer;
@@ -104,10 +105,17 @@ public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc> i
   private long hashTableScale;
   private MapJoinMemoryExhaustionHandler memoryExhaustionHandler;
 
-  public HashTableSinkOperator() {
+  /** Kryo ctor. */
+  protected HashTableSinkOperator() {
+    super();
   }
 
-  public HashTableSinkOperator(MapJoinOperator mjop) {
+  public HashTableSinkOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
+  public HashTableSinkOperator(CompilationOpContext ctx, MapJoinOperator mjop) {
+    this(ctx);
     this.conf = new HashTableSinkDesc(mjop.getConf());
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
index 3453fc9..08cc4b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -55,6 +56,15 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial
 
   private final transient LongWritable skewjoin_followup_jobs = new LongWritable(0);
 
+  /** Kryo ctor. */
+  protected JoinOperator() {
+    super();
+  }
+
+  public JoinOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java
index e866eed..4c94ad9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.LateralViewForwardDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -55,6 +56,15 @@ public class LateralViewForwardOperator extends Operator<LateralViewForwardDesc>
     return OperatorType.LATERALVIEWFORWARD;
   }
 
+  /** Kryo ctor. */
+  protected LateralViewForwardOperator() {
+    super();
+  }
+
+  public LateralViewForwardOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
index 55bb08f..7407dc6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -83,6 +84,15 @@ public class LateralViewJoinOperator extends Operator<LateralViewJoinDesc> {
   public static final byte SELECT_TAG = 0;
   public static final byte UDTF_TAG = 1;
 
+  /** Kryo ctor. */
+  protected LateralViewJoinOperator() {
+    super();
+  }
+
+  public LateralViewJoinOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
index fc85bea..239d56b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.LimitDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -39,6 +40,15 @@ public class LimitOperator extends Operator<LimitDesc> implements Serializable {
   protected transient int currCount;
   protected transient boolean isMap;
 
+  /** Kryo ctor. */
+  protected LimitOperator() {
+    super();
+  }
+
+  public LimitOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
index 919e72f..2f2abc1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
@@ -24,6 +24,7 @@ import java.util.Properties;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -43,6 +44,15 @@ public class ListSinkOperator extends Operator<ListSinkDesc> {
   private transient FetchFormatter fetcher;
   private transient int numRows;
 
+  /** Kryo ctor. */
+  protected ListSinkOperator() {
+    super();
+  }
+
+  public ListSinkOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index dc0b85e..91b5ca7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.HashTableLoaderFactory;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.persistence.BytesBytesMultiHashMap;
@@ -97,7 +98,13 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
   protected HybridHashTableContainer firstSmallTable; // The first small table;
                                                       // Only this table has spilled big table rows
 
-  public MapJoinOperator() {
+  /** Kryo ctor. */
+  protected MapJoinOperator() {
+    super();
+  }
+
+  public MapJoinOperator(CompilationOpContext ctx) {
+    super(ctx);
   }
 
   public MapJoinOperator(AbstractMapJoinOperator<? extends MapJoinDesc> mjop) {

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index ec0d95c..2b690f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.MapOperator.MapOpCtx;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -463,6 +464,15 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
     return nominal;
   }
 
+  /** Kryo ctor. */
+  protected MapOperator() {
+    super();
+  }
+
+  public MapOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   public void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java
index 4f4abd3..d8444fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.MuxDesc;
@@ -170,6 +171,15 @@ public class MuxOperator extends Operator<MuxDesc> implements Serializable{
   private transient long[] cntrs;
   private transient long[] nextCntrs;
 
+  /** Kryo ctor. */
+  protected MuxOperator() {
+    super();
+  }
+
+  public MuxOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 85ab6b2..dbe4f80 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -30,11 +30,11 @@ import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -67,6 +67,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
   public static final String CONTEXT_NAME_KEY = "__hive.context.name";
 
   private transient Configuration configuration;
+  protected transient CompilationOpContext cContext;
   protected List<Operator<? extends OperatorDesc>> childOperators;
   protected List<Operator<? extends OperatorDesc>> parentOperators;
   protected String operatorId;
@@ -75,8 +76,6 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
   private transient boolean rootInitializeCalled = false;
   protected final transient Collection<Future<?>> asyncInitOperations = new HashSet<>();
 
-  private static AtomicInteger seqId;
-
   // It can be optimized later so that an operator operator (init/close) is performed
   // only after that operation has been performed on all the parents. This will require
   // initializing the whole tree in all the mappers (which might be required for mappers
@@ -98,38 +97,24 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
 
   protected transient State state = State.UNINIT;
 
-  static {
-    seqId = new AtomicInteger(0);
-  }
-
   private boolean useBucketizedHiveInputFormat;
 
   // dummy operator (for not increasing seqId)
-  private Operator(String name) {
-    id = name;
+  protected Operator(String name, CompilationOpContext cContext) {
+    this();
+    this.cContext = cContext;
+    this.id = name;
     initOperatorId();
+  }
+
+  protected Operator() {
     childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
     parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
     abortOp = new AtomicBoolean(false);
   }
 
-  public Operator() {
-    this(String.valueOf(seqId.getAndIncrement()));
-  }
-
-  public static void resetId() {
-    seqId.set(0);
-  }
-
-  /**
-   * Create an operator with a reporter.
-   *
-   * @param reporter
-   *          Used to report progress of certain operators.
-   */
-  public Operator(Reporter reporter) {
-    this();
-    this.reporter = reporter;
+  public Operator(CompilationOpContext cContext) {
+    this(String.valueOf(cContext.nextOperatorId()), cContext);
   }
 
   public void setChildOperators(
@@ -228,7 +213,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
   protected transient final boolean isLogTraceEnabled = LOG.isTraceEnabled() && PLOG.isTraceEnabled();
   protected transient String alias;
   protected transient Reporter reporter;
-  protected transient String id;
+  protected String id;
   // object inspectors for input rows
   // We will increase the size of the array on demand
   protected transient ObjectInspector[] inputObjInspectors = new ObjectInspector[1];
@@ -1129,8 +1114,8 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
     @SuppressWarnings("unchecked")
     T descClone = (T)conf.clone();
     // also clone the colExprMap by default
-    Operator<? extends OperatorDesc> ret =
-        OperatorFactory.getAndMakeChild(descClone, getSchema(), getColumnExprMap(), parentClones);
+    Operator<? extends OperatorDesc> ret = OperatorFactory.getAndMakeChild(
+            cContext, descClone, getSchema(), getColumnExprMap(), parentClones);
 
     return ret;
   }
@@ -1145,8 +1130,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
   public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException {
     T descClone = (T) conf.clone();
     Operator<? extends OperatorDesc> ret =
-        OperatorFactory.getAndMakeChild(
-        descClone, getSchema());
+        OperatorFactory.getAndMakeChild(cContext, descClone, getSchema());
     return ret;
   }
 
@@ -1355,7 +1339,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
 
   @SuppressWarnings({ "serial", "unchecked", "rawtypes" })
   private static class DummyOperator extends Operator {
-    public DummyOperator() { super("dummy"); }
+    public DummyOperator() { super("dummy", null); }
 
     @Override
     public void process(Object row, int tag) {
@@ -1384,4 +1368,13 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
   public String getReduceOutputName() {
     return null;
   }
+
+  public void setCompilationOpContext(CompilationOpContext ctx) {
+    cContext = ctx;
+  }
+
+  /** @return Compilation operator context. Only available during compilation. */
+  public CompilationOpContext getCompilationOpContext() {
+    return cContext;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
index f619a56..038b96c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
@@ -19,11 +19,13 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.util.ArrayList;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
@@ -72,6 +74,8 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.UDTFDesc;
 import org.apache.hadoop.hive.ql.plan.UnionDesc;
 
+import com.google.common.base.Preconditions;
+
 /**
  * OperatorFactory.
  *
@@ -79,97 +83,68 @@ import org.apache.hadoop.hive.ql.plan.UnionDesc;
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public final class OperatorFactory {
   protected static transient final Logger LOG = LoggerFactory.getLogger(OperatorFactory.class);
-  private static final List<OpTuple> opvec;
-  private static final List<OpTuple> vectorOpvec;
+  private static final IdentityHashMap<Class<? extends OperatorDesc>,
+    Class<? extends Operator<? extends OperatorDesc>>> opvec = new IdentityHashMap<>();
+  private static final IdentityHashMap<Class<? extends OperatorDesc>,
+    Class<? extends Operator<? extends OperatorDesc>>> vectorOpvec = new IdentityHashMap<>();
 
   static {
-    opvec = new ArrayList<OpTuple>();
-    opvec.add(new OpTuple<FilterDesc>(FilterDesc.class, FilterOperator.class));
-    opvec.add(new OpTuple<SelectDesc>(SelectDesc.class, SelectOperator.class));
-    opvec.add(new OpTuple<ForwardDesc>(ForwardDesc.class, ForwardOperator.class));
-    opvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, FileSinkOperator.class));
-    opvec.add(new OpTuple<CollectDesc>(CollectDesc.class, CollectOperator.class));
-    opvec.add(new OpTuple<ScriptDesc>(ScriptDesc.class, ScriptOperator.class));
-    opvec.add(new OpTuple<PTFDesc>(PTFDesc.class, PTFOperator.class));
-    opvec.add(new OpTuple<ReduceSinkDesc>(ReduceSinkDesc.class, ReduceSinkOperator.class));
-    opvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, GroupByOperator.class));
-    opvec.add(new OpTuple<JoinDesc>(JoinDesc.class, JoinOperator.class));
-    opvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, MapJoinOperator.class));
-    opvec.add(new OpTuple<SMBJoinDesc>(SMBJoinDesc.class, SMBMapJoinOperator.class));
-    opvec.add(new OpTuple<LimitDesc>(LimitDesc.class, LimitOperator.class));
-    opvec.add(new OpTuple<TableScanDesc>(TableScanDesc.class, TableScanOperator.class));
-    opvec.add(new OpTuple<UnionDesc>(UnionDesc.class, UnionOperator.class));
-    opvec.add(new OpTuple<UDTFDesc>(UDTFDesc.class, UDTFOperator.class));
-    opvec.add(new OpTuple<LateralViewJoinDesc>(LateralViewJoinDesc.class,
-        LateralViewJoinOperator.class));
-    opvec.add(new OpTuple<LateralViewForwardDesc>(LateralViewForwardDesc.class,
-        LateralViewForwardOperator.class));
-    opvec.add(new OpTuple<HashTableDummyDesc>(HashTableDummyDesc.class,
-        HashTableDummyOperator.class));
-    opvec.add(new OpTuple<HashTableSinkDesc>(HashTableSinkDesc.class,
-        HashTableSinkOperator.class));
-    opvec.add(new OpTuple<SparkHashTableSinkDesc>(SparkHashTableSinkDesc.class,
-        SparkHashTableSinkOperator.class));
-    opvec.add(new OpTuple<DummyStoreDesc>(DummyStoreDesc.class,
-        DummyStoreOperator.class));
-    opvec.add(new OpTuple<DemuxDesc>(DemuxDesc.class,
-        DemuxOperator.class));
-    opvec.add(new OpTuple<MuxDesc>(MuxDesc.class,
-        MuxOperator.class));
-    opvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class,
-        AppMasterEventOperator.class));
-    opvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
-        AppMasterEventOperator.class));
-    opvec.add(new OpTuple<SparkPartitionPruningSinkDesc>(SparkPartitionPruningSinkDesc.class,
-        SparkPartitionPruningSinkOperator.class));
-    opvec.add(new OpTuple<RCFileMergeDesc>(RCFileMergeDesc.class,
-        RCFileMergeOperator.class));
-    opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class,
-        OrcFileMergeOperator.class));
-    opvec.add(new OpTuple<CommonMergeJoinDesc>(CommonMergeJoinDesc.class,
-        CommonMergeJoinOperator.class));
-    opvec.add(new OpTuple<ListSinkDesc>(ListSinkDesc.class,
-        ListSinkOperator.class));
+    opvec.put(FilterDesc.class, FilterOperator.class);
+    opvec.put(SelectDesc.class, SelectOperator.class);
+    opvec.put(ForwardDesc.class, ForwardOperator.class);
+    opvec.put(FileSinkDesc.class, FileSinkOperator.class);
+    opvec.put(CollectDesc.class, CollectOperator.class);
+    opvec.put(ScriptDesc.class, ScriptOperator.class);
+    opvec.put(PTFDesc.class, PTFOperator.class);
+    opvec.put(ReduceSinkDesc.class, ReduceSinkOperator.class);
+    opvec.put(GroupByDesc.class, GroupByOperator.class);
+    opvec.put(JoinDesc.class, JoinOperator.class);
+    opvec.put(MapJoinDesc.class, MapJoinOperator.class);
+    opvec.put(SMBJoinDesc.class, SMBMapJoinOperator.class);
+    opvec.put(LimitDesc.class, LimitOperator.class);
+    opvec.put(TableScanDesc.class, TableScanOperator.class);
+    opvec.put(UnionDesc.class, UnionOperator.class);
+    opvec.put(UDTFDesc.class, UDTFOperator.class);
+    opvec.put(LateralViewJoinDesc.class, LateralViewJoinOperator.class);
+    opvec.put(LateralViewForwardDesc.class, LateralViewForwardOperator.class);
+    opvec.put(HashTableDummyDesc.class, HashTableDummyOperator.class);
+    opvec.put(HashTableSinkDesc.class, HashTableSinkOperator.class);
+    opvec.put(SparkHashTableSinkDesc.class, SparkHashTableSinkOperator.class);
+    opvec.put(DummyStoreDesc.class, DummyStoreOperator.class);
+    opvec.put(DemuxDesc.class, DemuxOperator.class);
+    opvec.put(MuxDesc.class, MuxOperator.class);
+    opvec.put(AppMasterEventDesc.class, AppMasterEventOperator.class);
+    opvec.put(DynamicPruningEventDesc.class, AppMasterEventOperator.class);
+    opvec.put(SparkPartitionPruningSinkDesc.class, SparkPartitionPruningSinkOperator.class);
+    opvec.put(RCFileMergeDesc.class, RCFileMergeOperator.class);
+    opvec.put(OrcFileMergeDesc.class, OrcFileMergeOperator.class);
+    opvec.put(CommonMergeJoinDesc.class, CommonMergeJoinOperator.class);
+    opvec.put(ListSinkDesc.class, ListSinkOperator.class);
   }
 
   static {
-    vectorOpvec = new ArrayList<OpTuple>();
-    vectorOpvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class,
-        VectorAppMasterEventOperator.class));
-    vectorOpvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
-        VectorAppMasterEventOperator.class));
-    vectorOpvec.add(new OpTuple<SparkPartitionPruningSinkDesc>(
-        SparkPartitionPruningSinkDesc.class,
-        VectorSparkPartitionPruningSinkOperator.class));
-    vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
-    vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class));
-    vectorOpvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, VectorMapJoinOperator.class));
-    vectorOpvec.add(new OpTuple<SMBJoinDesc>(SMBJoinDesc.class, VectorSMBMapJoinOperator.class));
-    vectorOpvec.add(new OpTuple<ReduceSinkDesc>(ReduceSinkDesc.class,
-        VectorReduceSinkOperator.class));
-    vectorOpvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, VectorFileSinkOperator.class));
-    vectorOpvec.add(new OpTuple<FilterDesc>(FilterDesc.class, VectorFilterOperator.class));
-    vectorOpvec.add(new OpTuple<LimitDesc>(LimitDesc.class, VectorLimitOperator.class));
-    vectorOpvec.add(new OpTuple<SparkHashTableSinkDesc>(SparkHashTableSinkDesc.class,
-        VectorSparkHashTableSinkOperator.class));
-  }
-
-  private static final class OpTuple<T extends OperatorDesc> {
-    private final Class<T> descClass;
-    private final Class<? extends Operator<?>> opClass;
-
-    public OpTuple(Class<T> descClass, Class<? extends Operator<?>> opClass) {
-      this.descClass = descClass;
-      this.opClass = opClass;
-    }
+    vectorOpvec.put(AppMasterEventDesc.class, VectorAppMasterEventOperator.class);
+    vectorOpvec.put(DynamicPruningEventDesc.class, VectorAppMasterEventOperator.class);
+    vectorOpvec.put(
+        SparkPartitionPruningSinkDesc.class, VectorSparkPartitionPruningSinkOperator.class);
+    vectorOpvec.put(SelectDesc.class, VectorSelectOperator.class);
+    vectorOpvec.put(GroupByDesc.class, VectorGroupByOperator.class);
+    vectorOpvec.put(MapJoinDesc.class, VectorMapJoinOperator.class);
+    vectorOpvec.put(SMBJoinDesc.class, VectorSMBMapJoinOperator.class);
+    vectorOpvec.put(ReduceSinkDesc.class, VectorReduceSinkOperator.class);
+    vectorOpvec.put(FileSinkDesc.class, VectorFileSinkOperator.class);
+    vectorOpvec.put(FilterDesc.class, VectorFilterOperator.class);
+    vectorOpvec.put(LimitDesc.class, VectorLimitOperator.class);
+    vectorOpvec.put(SparkHashTableSinkDesc.class, VectorSparkHashTableSinkOperator.class);
   }
 
   public static <T extends OperatorDesc> Operator<T> getVectorOperator(
-    Class<? extends Operator<?>> opClass, T conf, VectorizationContext vContext) throws HiveException {
+    Class<? extends Operator<?>> opClass, CompilationOpContext cContext, T conf,
+        VectorizationContext vContext) throws HiveException {
     try {
       Operator<T> op = (Operator<T>) opClass.getDeclaredConstructor(
-          VectorizationContext.class, OperatorDesc.class).newInstance(
-          vContext, conf);
+          CompilationOpContext.class, VectorizationContext.class, OperatorDesc.class)
+          .newInstance(cContext, vContext, conf);
       return op;
     } catch (Exception e) {
       e.printStackTrace();
@@ -177,50 +152,49 @@ public final class OperatorFactory {
     }
   }
 
-  public static <T extends OperatorDesc> Operator<T> getVectorOperator(T conf,
-      VectorizationContext vContext) throws HiveException {
+  public static <T extends OperatorDesc> Operator<T> getVectorOperator(
+      CompilationOpContext cContext, T conf, VectorizationContext vContext) throws HiveException {
     Class<T> descClass = (Class<T>) conf.getClass();
-    for (OpTuple o : vectorOpvec) {
-      if (o.descClass == descClass) {
-        return getVectorOperator(o.opClass, conf, vContext);
-      }
+    Class<?> opClass = vectorOpvec.get(descClass);
+    if (opClass != null) {
+      return getVectorOperator(vectorOpvec.get(descClass), cContext, conf, vContext);
     }
-    throw new HiveException("No vector operator for descriptor class "
-        + descClass.getName());
+    throw new HiveException("No vector operator for descriptor class " + descClass.getName());
   }
 
-  public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) {
-
-    for (OpTuple o : opvec) {
-      if (o.descClass == opClass) {
-        try {
-          Operator<T> op = (Operator<T>) o.opClass.newInstance();
-          return op;
-        } catch (Exception e) {
-          e.printStackTrace();
-          throw new RuntimeException(e);
-        }
+  public static <T extends OperatorDesc> Operator<T> get(
+      CompilationOpContext cContext, Class<T> descClass) {
+    Preconditions.checkNotNull(cContext);
+    Class<?> opClass = opvec.get(descClass);
+    if (opClass != null) {
+      try {
+        return (Operator<T>)opClass.getDeclaredConstructor(
+          CompilationOpContext.class).newInstance(cContext);
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw new RuntimeException(e);
       }
     }
-    throw new RuntimeException("No operator for descriptor class "
-        + opClass.getName());
+    throw new RuntimeException("No operator for descriptor class " + descClass.getName());
   }
 
-  public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass,
-      RowSchema rwsch) {
-
-    Operator<T> ret = get(opClass);
-    ret.setSchema(rwsch);
-    return ret;
+  /**
+   * Returns an operator given the conf and a list of children operators.
+   */
+  public static <T extends OperatorDesc> Operator<T> get(CompilationOpContext cContext, T conf) {
+    Operator<T> ret = get(cContext, (Class<T>) conf.getClass());
+    ret.setConf(conf);
+    return (ret);
   }
 
   /**
    * Returns an operator given the conf and a list of children operators.
    */
   public static <T extends OperatorDesc> Operator<T> get(T conf,
-    Operator<? extends OperatorDesc>... oplist) {
-    Operator<T> ret = get((Class<T>) conf.getClass());
+    Operator<? extends OperatorDesc> oplist0, Operator<? extends OperatorDesc>... oplist) {
+    Operator<T> ret = get(oplist0.getCompilationOpContext(), (Class<T>) conf.getClass());
     ret.setConf(conf);
+    makeChild(ret, oplist0);
     makeChild(ret, oplist);
     return (ret);
   }
@@ -253,27 +227,28 @@ public final class OperatorFactory {
   /**
    * Returns an operator given the conf and a list of children operators.
    */
-  public static <T extends OperatorDesc> Operator<T> get(T conf,
-      RowSchema rwsch, Operator... oplist) {
-    Operator<T> ret = get(conf, oplist);
+  public static <T extends OperatorDesc> Operator<T> get(
+      CompilationOpContext cContext, T conf, RowSchema rwsch) {
+    Operator<T> ret = get(cContext, conf);
     ret.setSchema(rwsch);
     return (ret);
   }
 
+
   /**
    * Returns an operator given the conf and a list of parent operators.
    */
-  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
-      Operator... oplist) {
-    Operator<T> ret = get((Class<T>) conf.getClass());
+  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
+      T conf, Operator oplist0, Operator... oplist) {
+    Operator<T> ret = get(oplist0.getCompilationOpContext(), (Class<T>) conf.getClass());
     ret.setConf(conf);
-    if (oplist.length == 0) {
-      return (ret);
-    }
 
     // Add the new operator as child of each of the passed in operators
+    List<Operator> children = oplist0.getChildOperators();
+    children.add(ret);
+    oplist0.setChildOperators(children);
     for (Operator op : oplist) {
-      List<Operator> children = op.getChildOperators();
+      children = op.getChildOperators();
       children.add(ret);
       op.setChildOperators(children);
     }
@@ -281,6 +256,7 @@ public final class OperatorFactory {
     // add parents for the newly created operator
     List<Operator<? extends OperatorDesc>> parent =
       new ArrayList<Operator<? extends OperatorDesc>>();
+    parent.add(oplist0);
     for (Operator op : oplist) {
       parent.add(op);
     }
@@ -293,9 +269,9 @@ public final class OperatorFactory {
   /**
    * Returns an operator given the conf and a list of parent operators.
    */
-  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
-      List<Operator<? extends OperatorDesc>> oplist) {
-    Operator<T> ret = get((Class<T>) conf.getClass());
+  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext,
+      T conf, List<Operator<? extends OperatorDesc>> oplist) {
+    Operator<T> ret = get(cContext, (Class<T>) conf.getClass());
     ret.setConf(conf);
     if (oplist.size() == 0) {
       return ret;
@@ -322,9 +298,49 @@ public final class OperatorFactory {
   /**
    * Returns an operator given the conf and a list of parent operators.
    */
-  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
-      RowSchema rwsch, Operator... oplist) {
-    Operator<T> ret = getAndMakeChild(conf, oplist);
+  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
+      CompilationOpContext cContext, T conf, RowSchema rwsch) {
+    Operator<T> ret = get(cContext, (Class<T>) conf.getClass());
+    ret.setConf(conf);
+    ret.setSchema(rwsch);
+    return ret;
+  }
+
+  /**
+   * Returns an operator given the conf and a list of parent operators.
+   */
+  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
+      CompilationOpContext ctx, T conf, RowSchema rwsch, Operator[] oplist) {
+    Operator<T> ret = get(ctx, (Class<T>) conf.getClass());
+    ret.setConf(conf);
+    ret.setSchema(rwsch);
+    if (oplist.length == 0) return ret;
+
+    // Add the new operator as child of each of the passed in operators
+    for (Operator op : oplist) {
+      List<Operator> children = op.getChildOperators();
+      children.add(ret);
+      op.setChildOperators(children);
+    }
+
+    // add parents for the newly created operator
+    List<Operator<? extends OperatorDesc>> parent =
+      new ArrayList<Operator<? extends OperatorDesc>>();
+    for (Operator op : oplist) {
+      parent.add(op);
+    }
+
+    ret.setParentOperators(parent);
+
+    return (ret);
+  }
+
+  /**
+   * Returns an operator given the conf and a list of parent operators.
+   */
+  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
+      T conf, RowSchema rwsch, Operator oplist0, Operator... oplist) {
+    Operator<T> ret = getAndMakeChild(conf, oplist0, oplist);
     ret.setSchema(rwsch);
     return ret;
   }
@@ -332,9 +348,9 @@ public final class OperatorFactory {
   /**
    * Returns an operator given the conf and a list of parent operators.
    */
-  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
-      RowSchema rwsch, Map<String, ExprNodeDesc> colExprMap, Operator... oplist) {
-    Operator<T> ret = getAndMakeChild(conf, rwsch, oplist);
+  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf, RowSchema rwsch,
+      Map<String, ExprNodeDesc> colExprMap, Operator oplist0, Operator... oplist) {
+    Operator<T> ret = getAndMakeChild(conf, rwsch, oplist0, oplist);
     ret.setColumnExprMap(colExprMap);
     return (ret);
   }
@@ -342,9 +358,9 @@ public final class OperatorFactory {
   /**
    * Returns an operator given the conf and a list of parent operators.
    */
-  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
-      RowSchema rwsch, List<Operator<? extends OperatorDesc>> oplist) {
-    Operator<T> ret = getAndMakeChild(conf, oplist);
+  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext,
+      T conf, RowSchema rwsch, List<Operator<? extends OperatorDesc>> oplist) {
+    Operator<T> ret = getAndMakeChild(cContext, conf, oplist);
     ret.setSchema(rwsch);
     return (ret);
   }
@@ -352,9 +368,10 @@ public final class OperatorFactory {
  /**
    * Returns an operator given the conf and a list of parent operators.
    */
-  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
-      RowSchema rwsch, Map<String, ExprNodeDesc> colExprMap, List<Operator<? extends OperatorDesc>> oplist) {
-    Operator<T> ret = getAndMakeChild(conf, rwsch, oplist);
+  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext,
+      T conf, RowSchema rwsch, Map<String, ExprNodeDesc> colExprMap,
+      List<Operator<? extends OperatorDesc>> oplist) {
+    Operator<T> ret = getAndMakeChild(cContext, conf, rwsch, oplist);
     ret.setColumnExprMap(colExprMap);
     return (ret);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
index 2c9deac..445cf3d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.IOException;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.io.orc.Writer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +57,15 @@ public class OrcFileMergeOperator extends
   private Reader reader;
   private FSDataInputStream fdis;
 
+  /** Kryo ctor. */
+  protected OrcFileMergeOperator() {
+    super();
+  }
+
+  public OrcFileMergeOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   public void process(Object row, int tag) throws HiveException {
     Object[] keyValue = (Object[]) row;

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
index 113ac21..2e9e539 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
@@ -26,6 +26,7 @@ import java.util.Stack;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -60,6 +61,15 @@ public class PTFOperator extends Operator<PTFDesc> implements Serializable {
   transient Configuration hiveConf;
   transient PTFInvocation ptfInvocation;
 
+  /** Kryo ctor. */
+  protected PTFOperator() {
+    super();
+  }
+
+  public PTFOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   /*
    * 1. Find out if the operator is invoked at Map-Side or Reduce-side
    * 2. Get the deserialized QueryDef

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
index c34454c..4dea1d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
 import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileKeyBufferWrapper;
@@ -36,12 +37,22 @@ import java.io.IOException;
  */
 public class RCFileMergeOperator
     extends AbstractFileMergeOperator<RCFileMergeDesc> {
+
   public final static Logger LOG = LoggerFactory.getLogger("RCFileMergeMapper");
 
   RCFile.Writer outWriter;
   CompressionCodec codec = null;
   int columnNumber = 0;
 
+  /** Kryo ctor. */
+  protected RCFileMergeOperator() {
+    super();
+  }
+
+  public RCFileMergeOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   public void process(Object row, int tag) throws HiveException {
     Object[] keyValue = (Object[]) row;

http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 4b65952..74b4802 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -151,6 +152,15 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
   protected transient long logEveryNRows = 0;
   private final transient LongWritable recordCounter = new LongWritable();
 
+  /** Kryo ctor. */
+  protected ReduceSinkOperator() {
+    super();
+  }
+
+  public ReduceSinkOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);