You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/12/08 20:08:11 UTC

[1/2] hive git commit: HIVE-12302: Use KryoPool instead of thread-local caching (Prasanth Jayachandran reviewed by Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 560e4feba -> 2bb5e63c9


http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
index 058d63d..cb70ac8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
@@ -177,7 +178,7 @@ public class MapRedTask extends ExecDriver implements Serializable {
       OutputStream out = null;
       try {
         out = FileSystem.getLocal(conf).create(planPath);
-        Utilities.serializePlan(plan, out, conf);
+        SerializationUtilities.serializePlan(plan, out, conf);
         out.close();
         out = null;
       } finally {

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index bfe21db..cb7dfa1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -33,6 +33,7 @@ import java.util.Properties;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -159,7 +160,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab
       OutputStream out = null;
       try {
         out = FileSystem.getLocal(conf).create(planPath);
-        Utilities.serializePlan(plan, out, conf);
+        SerializationUtilities.serializePlan(plan, out, conf);
         out.close();
         out = null;
       } finally {

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index a0c9b98..f2f3c09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -29,15 +29,12 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hive.common.util.HashCodeUtil;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
 import org.apache.hadoop.hive.ql.exec.JoinUtil.JoinResult;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper;
 import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper;
 import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
@@ -58,6 +55,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hive.common.util.BloomFilter;
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.esotericsoftware.kryo.Kryo;
 
@@ -159,8 +159,13 @@ public class HybridHashTableContainer
       } else {
         InputStream inputStream = Files.newInputStream(hashMapLocalPath);
         com.esotericsoftware.kryo.io.Input input = new com.esotericsoftware.kryo.io.Input(inputStream);
-        Kryo kryo = Utilities.runtimeSerializationKryo.get();
-        BytesBytesMultiHashMap restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class);
+        Kryo kryo = SerializationUtilities.borrowKryo();
+        BytesBytesMultiHashMap restoredHashMap = null;
+        try {
+          restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class);
+        } finally {
+          SerializationUtilities.releaseKryo(kryo);
+        }
 
         if (rowCount > 0) {
           restoredHashMap.expandAndRehashToTarget(rowCount);
@@ -551,10 +556,14 @@ public class HybridHashTableContainer
 
     com.esotericsoftware.kryo.io.Output output =
         new com.esotericsoftware.kryo.io.Output(outputStream);
-    Kryo kryo = Utilities.runtimeSerializationKryo.get();
-    kryo.writeObject(output, partition.hashMap);  // use Kryo to serialize hashmap
-    output.close();
-    outputStream.close();
+    Kryo kryo = SerializationUtilities.borrowKryo();
+    try {
+      kryo.writeObject(output, partition.hashMap);  // use Kryo to serialize hashmap
+      output.close();
+      outputStream.close();
+    } finally {
+      SerializationUtilities.releaseKryo(kryo);
+    }
 
     partition.hashMapLocalPath = path;
     partition.hashMapOnDisk = true;

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java
index 6d391a3..a976de0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java
@@ -17,21 +17,22 @@
  */
 package org.apache.hadoop.hive.ql.exec.persistence;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
 
 /**
  * An eager object container that puts every row directly to output stream.
@@ -58,14 +59,11 @@ public class ObjectContainer<ROW> {
   private Input input;
   private Output output;
 
-  private Kryo kryo;
-
   public ObjectContainer() {
     readBuffer = (ROW[]) new Object[IN_MEMORY_NUM_ROWS];
     for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) {
       readBuffer[i] = (ROW) new Object();
     }
-    kryo = Utilities.runtimeSerializationKryo.get();
     try {
       setupOutput();
     } catch (IOException | HiveException e) {
@@ -101,7 +99,12 @@ public class ObjectContainer<ROW> {
   }
 
   public void add(ROW row) {
-    kryo.writeClassAndObject(output, row);
+    Kryo kryo = SerializationUtilities.borrowKryo();
+    try {
+      kryo.writeClassAndObject(output, row);
+    } finally {
+      SerializationUtilities.releaseKryo(kryo);
+    }
     rowsOnDisk++;
   }
 
@@ -164,8 +167,13 @@ public class ObjectContainer<ROW> {
             rowsInReadBuffer = rowsOnDisk;
           }
 
-          for (int i = 0; i < rowsInReadBuffer; i++) {
-            readBuffer[i] = (ROW) kryo.readClassAndObject(input);
+          Kryo kryo = SerializationUtilities.borrowKryo();
+          try {
+            for (int i = 0; i < rowsInReadBuffer; i++) {
+              readBuffer[i] = (ROW) kryo.readClassAndObject(input);
+            }
+          } finally {
+            SerializationUtilities.releaseKryo(kryo);
           }
 
           if (input.eof()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
index fd7109a..d7c278a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
@@ -24,11 +24,12 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.mapred.JobConf;
 
+import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
@@ -39,15 +40,28 @@ public class KryoSerializer {
     ByteArrayOutputStream stream = new ByteArrayOutputStream();
     Output output = new Output(stream);
 
-    Utilities.sparkSerializationKryo.get().writeObject(output, object);
+    Kryo kryo = SerializationUtilities.borrowKryo();
+    kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+    try {
+      kryo.writeObject(output, object);
+    } finally {
+      SerializationUtilities.releaseKryo(kryo);
+    }
 
     output.close(); // close() also calls flush()
     return stream.toByteArray();
   }
 
   public static <T> T deserialize(byte[] buffer, Class<T> clazz) {
-    return Utilities.sparkSerializationKryo.get().readObject(
-        new Input(new ByteArrayInputStream(buffer)), clazz);
+    Kryo kryo = SerializationUtilities.borrowKryo();
+    kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+    T result = null;
+    try {
+      result = kryo.readObject(new Input(new ByteArrayInputStream(buffer)), clazz);
+    } finally {
+      SerializationUtilities.releaseKryo(kryo);
+    }
+    return result;
   }
 
   public static byte[] serializeJobConf(JobConf jobConf) {
@@ -80,8 +94,4 @@ public class KryoSerializer {
     return conf;
   }
 
-  public static void setClassLoader(ClassLoader classLoader) {
-    Utilities.sparkSerializationKryo.get().setClassLoader(classLoader);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index c4cb2ba..6380774 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -298,7 +298,6 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
       Map<String, Long> addedJars = jc.getAddedJars();
       if (addedJars != null && !addedJars.isEmpty()) {
         SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir());
-        KryoSerializer.setClassLoader(Thread.currentThread().getContextClassLoader());
         localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";"));
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 3feab1a..b19c70a 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configurable;
@@ -532,14 +533,14 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     if (!hasObj) {
       Serializable filterObject = scanDesc.getFilterObject();
       if (filterObject != null) {
-        serializedFilterObj = Utilities.serializeObject(filterObject);
+        serializedFilterObj = SerializationUtilities.serializeObject(filterObject);
       }
     }
     if (serializedFilterObj != null) {
       jobConf.set(TableScanDesc.FILTER_OBJECT_CONF_STR, serializedFilterObj);
     }
     if (!hasExpr) {
-      serializedFilterExpr = Utilities.serializeExpression(filterExpr);
+      serializedFilterExpr = SerializationUtilities.serializeExpression(filterExpr);
     }
     String filterText = filterExpr.getExprString();
     if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
index 13390de..017676b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
@@ -128,7 +129,7 @@ public class ProjectionPusher {
     }
 
     final String filterText = filterExpr.getExprString();
-    final String filterExprSerialized = Utilities.serializeExpression(filterExpr);
+    final String filterExprSerialized = SerializationUtilities.serializeExpression(filterExpr);
     jobConf.set(
         TableScanDesc.FILTER_TEXT_CONF_STR,
         filterText);

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java b/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java
index 7e888bc..6d3a134 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java
@@ -23,11 +23,9 @@ import java.sql.Timestamp;
 import java.util.List;
 
 import org.apache.commons.codec.binary.Base64;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.type.HiveChar;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -51,6 +49,8 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
@@ -433,7 +433,7 @@ public class ConvertAstToSearchArg {
   public static SearchArgument createFromConf(Configuration conf) {
     String sargString;
     if ((sargString = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR)) != null) {
-      return create(Utilities.deserializeExpression(sargString));
+      return create(SerializationUtilities.deserializeExpression(sargString));
     } else if ((sargString = conf.get(SARG_PUSHDOWN)) != null) {
       return create(sargString);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 44189ef..c682df2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -18,10 +18,32 @@
 
 package org.apache.hadoop.hive.ql.metadata;
 
-import com.google.common.collect.Sets;
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
+import static org.apache.hadoop.hive.serde.serdeConstants.COLLECTION_DELIM;
+import static org.apache.hadoop.hive.serde.serdeConstants.ESCAPE_CHAR;
+import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM;
+import static org.apache.hadoop.hive.serde.serdeConstants.LINE_DELIM;
+import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM;
+import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
+import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -81,6 +103,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.FunctionTask;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.tez.InPlaceUpdates;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
@@ -100,32 +123,10 @@ import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
-import static org.apache.hadoop.hive.serde.serdeConstants.COLLECTION_DELIM;
-import static org.apache.hadoop.hive.serde.serdeConstants.ESCAPE_CHAR;
-import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM;
-import static org.apache.hadoop.hive.serde.serdeConstants.LINE_DELIM;
-import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM;
-import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
-import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
+import com.google.common.collect.Sets;
 
 /**
  * This class has functions that implement meta data/DDL operations using calls
@@ -2087,7 +2088,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
           new ArrayList<ObjectPair<Integer,byte[]>>(partSpecs.size());
       for (DropTableDesc.PartSpec partSpec : partSpecs) {
         partExprs.add(new ObjectPair<Integer, byte[]>(partSpec.getPrefixLength(),
-            Utilities.serializeExpressionToKryo(partSpec.getPartSpec())));
+            SerializationUtilities.serializeExpressionToKryo(partSpec.getPartSpec())));
       }
       List<org.apache.hadoop.hive.metastore.api.Partition> tParts = getMSC().dropPartitions(
           dbName, tblName, partExprs, dropOptions);
@@ -2362,7 +2363,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
   public boolean getPartitionsByExpr(Table tbl, ExprNodeGenericFuncDesc expr, HiveConf conf,
       List<Partition> result) throws HiveException, TException {
     assert result != null;
-    byte[] exprBytes = Utilities.serializeExpressionToKryo(expr);
+    byte[] exprBytes = SerializationUtilities.serializeExpressionToKryo(expr);
     String defaultPartitionName = HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME);
     List<org.apache.hadoop.hive.metastore.api.Partition> msParts =
         new ArrayList<org.apache.hadoop.hive.metastore.api.Partition>();

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
index 1f6b5d7..e9ca5fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -485,7 +486,7 @@ public class CommonJoinTaskDispatcher extends AbstractJoinTaskDispatcher impleme
         }
         // deep copy a new mapred work from xml
         // Once HIVE-4396 is in, it would be faster to use a cheaper method to clone the plan
-        MapredWork newWork = Utilities.clonePlan(currTask.getWork());
+        MapredWork newWork = SerializationUtilities.clonePlan(currTask.getWork());
 
         // create map join task and set big table as i
         MapRedTask newTask = convertTaskToMapJoinTask(newWork, pos);

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
index 15f0d70..a71c474 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
@@ -18,6 +18,13 @@
 
 package org.apache.hadoop.hive.ql.optimizer.physical;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
@@ -27,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -53,13 +61,6 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
  * GenMRSkewJoinProcessor.
  *
@@ -253,7 +254,7 @@ public final class GenMRSkewJoinProcessor {
           HiveConf.ConfVars.HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS);
       newPlan.setMapperCannotSpanPartns(mapperCannotSpanPartns);
 
-      MapredWork clonePlan = Utilities.clonePlan(currPlan);
+      MapredWork clonePlan = SerializationUtilities.clonePlan(currPlan);
 
       Operator<? extends OperatorDesc>[] parentOps = new TableScanOperator[tags.length];
       for (int k = 0; k < tags.length; k++) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
index 895e64e..41d3522 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.hive.ql.optimizer.physical;
 
-import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
@@ -32,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -62,12 +65,10 @@ import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import com.google.common.base.Preconditions;
 
 /**
  * Copied from GenMRSkewJoinProcessor. It's used for spark task
@@ -254,7 +255,7 @@ public class GenSparkSkewJoinProcessor {
       // this makes sure MJ has the same downstream operator plan as the original join
       List<Operator<?>> reducerList = new ArrayList<Operator<?>>();
       reducerList.add(reduceWork.getReducer());
-      Operator<? extends OperatorDesc> reducer = Utilities.cloneOperatorTree(
+      Operator<? extends OperatorDesc> reducer = SerializationUtilities.cloneOperatorTree(
           parseCtx.getConf(), reducerList).get(0);
       Preconditions.checkArgument(reducer instanceof JoinOperator,
           "Reducer should be join operator, but actually is " + reducer.getName());

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
index e94f6e7..dc433fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
@@ -19,29 +19,18 @@ package org.apache.hadoop.hive.ql.optimizer.physical;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
-import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import java.util.SortedSet;
 import java.util.Stack;
-import java.util.TreeSet;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.StatsTask;
-import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -52,13 +41,14 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * SerializeFilter is a simple physical optimizer that serializes all filter expressions in
@@ -151,7 +141,7 @@ public class SerializeFilter implements PhysicalPlanResolver {
             LOG.debug("Serializing: " + ts.getConf().getFilterExpr().getExprString());
           }
           ts.getConf().setSerializedFilterExpr(
-            Utilities.serializeExpression(ts.getConf().getFilterExpr()));
+              SerializationUtilities.serializeExpression(ts.getConf().getFilterExpr()));
         }
 
         if (ts.getConf() != null && ts.getConf().getFilterObject() != null) {
@@ -160,7 +150,7 @@ public class SerializeFilter implements PhysicalPlanResolver {
           }
 
           ts.getConf().setSerializedFilterObject(
-            Utilities.serializeObject(ts.getConf().getFilterObject()));
+              SerializationUtilities.serializeObject(ts.getConf().getFilterObject()));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
index 3b09c2f..658717c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -147,7 +148,7 @@ public class SortMergeJoinTaskDispatcher extends AbstractJoinTaskDispatcher impl
       throws SemanticException {
     try {
       // deep copy a new mapred work
-      MapredWork currJoinWork = Utilities.clonePlan(currWork);
+      MapredWork currJoinWork = SerializationUtilities.clonePlan(currWork);
       SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork);
 
       // change the newly created map-red plan as if it was a join operator
@@ -165,7 +166,7 @@ public class SortMergeJoinTaskDispatcher extends AbstractJoinTaskDispatcher impl
       SMBMapJoinOperator smbJoinOp)
       throws UnsupportedEncodingException, SemanticException {
     // deep copy a new mapred work
-    MapredWork newWork = Utilities.clonePlan(origWork);
+    MapredWork newWork = SerializationUtilities.clonePlan(origWork);
     // create a mapred task for this work
     MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext
         .getParseContext().getConf());

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java
index f9978b4..42ad04b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java
@@ -22,13 +22,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.metastore.Metastore.SplitInfo;
 import org.apache.hadoop.hive.metastore.Metastore.SplitInfos;
 import org.apache.hadoop.hive.metastore.PartitionExpressionProxy;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto;
 import org.apache.hadoop.hive.ql.io.orc.ReaderImpl;
@@ -38,6 +36,8 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The basic implementation of PartitionExpressionProxy that uses ql package classes.
@@ -71,7 +71,7 @@ public class PartitionExpressionForMetastore implements PartitionExpressionProxy
   private ExprNodeGenericFuncDesc deserializeExpr(byte[] exprBytes) throws MetaException {
     ExprNodeGenericFuncDesc expr = null;
     try {
-      expr = Utilities.deserializeExpressionFromKryo(exprBytes);
+      expr = SerializationUtilities.deserializeExpressionFromKryo(exprBytes);
     } catch (Exception ex) {
       LOG.error("Failed to deserialize the expression", ex);
       throw new MetaException(ex.getMessage());

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java
index fb20080..6931ad9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java
@@ -30,8 +30,8 @@ import java.util.Set;
 
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalPlanResolver;
@@ -95,7 +95,7 @@ public class SplitSparkWorkResolver implements PhysicalPlanResolver {
     boolean isFirst = true;
 
     for (BaseWork childWork : childWorks) {
-      BaseWork clonedParentWork = Utilities.cloneBaseWork(parentWork);
+      BaseWork clonedParentWork = SerializationUtilities.cloneBaseWork(parentWork);
       // give the cloned work a different name
       clonedParentWork.setName(clonedParentWork.getName().replaceAll("^([a-zA-Z]+)(\\s+)(\\d+)",
           "$1$2" + GenSparkUtils.getUtils().getNextSeqNumber()));

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 27d7276..fe0e234 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL;
+
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.HashSet;
@@ -26,8 +28,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -55,12 +56,12 @@ import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.UnionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 
-import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL;
-
 /**
  * GenTezUtils is a collection of shared helper methods to produce TezWork.
  * All the methods in this class should be static, but some aren't; this is to facilitate testing.
@@ -216,7 +217,7 @@ public class GenTezUtils {
     roots.addAll(context.eventOperatorSet);
 
     // need to clone the plan.
-    List<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
+    List<Operator<?>> newRoots = SerializationUtilities.cloneOperatorTree(conf, roots);
 
     // we're cloning the operator plan but we're retaining the original work. That means
     // that root operators have to be replaced with the cloned ops. The replacement map

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 40c23a5..8dc48cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -28,8 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -43,9 +41,9 @@ import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
@@ -60,10 +58,12 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
 
 /**
  * GenSparkUtils is a collection of shared helper methods to produce SparkWork
@@ -207,7 +207,7 @@ public class GenSparkUtils {
     }
 
     // need to clone the plan.
-    List<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
+    List<Operator<?>> newRoots = SerializationUtilities.cloneOperatorTree(conf, roots);
 
     // Build a map to map the original FileSinkOperator and the cloned FileSinkOperators
     // This map is used for set the stats flag for the cloned FileSinkOperators in later process

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
index c140f67..4bb661a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
@@ -24,10 +24,10 @@ import java.util.List;
 import java.util.Set;
 import java.util.Stack;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
@@ -36,6 +36,8 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 
+import com.google.common.base.Preconditions;
+
 /**
  * This processor triggers on SparkPartitionPruningSinkOperator. For a operator tree like
  * this:
@@ -105,7 +107,8 @@ public class SplitOpTreeForDPP implements NodeProcessor {
     filterOp.setChildOperators(Utilities.makeList(selOp));
 
     // Now clone the tree above selOp
-    List<Operator<?>> newRoots = Utilities.cloneOperatorTree(context.parseContext.getConf(), roots);
+    List<Operator<?>> newRoots = SerializationUtilities.cloneOperatorTree(
+        context.parseContext.getConf(), roots);
     for (int i = 0; i < roots.size(); i++) {
       TableScanOperator newTs = (TableScanOperator) newRoots.get(i);
       TableScanOperator oldTs = (TableScanOperator) roots.get(i);

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
index 5c5fafa..e2aaa70 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
@@ -24,17 +24,18 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.stats.StatsAggregator;
 import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 
 public class FSStatsAggregator implements StatsAggregator {
@@ -62,7 +63,12 @@ public class FSStatsAggregator implements StatsAggregator {
       });
       for (FileStatus file : status) {
         Input in = new Input(fs.open(file.getPath()));
-        statsMap = Utilities.runtimeSerializationKryo.get().readObject(in, statsMap.getClass());
+        Kryo kryo = SerializationUtilities.borrowKryo();
+        try {
+          statsMap = kryo.readObject(in, statsMap.getClass());
+        } finally {
+          SerializationUtilities.releaseKryo(kryo);
+        }
         LOG.info("Read stats : " +statsMap);
         statsList.add(statsMap);
         in.close();

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java
index 80f954b..e5d89e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java
@@ -24,15 +24,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Output;
 
 public class FSStatsPublisher implements StatsPublisher {
@@ -100,7 +101,12 @@ public class FSStatsPublisher implements StatsPublisher {
       Output output = new Output(statsFile.getFileSystem(conf).create(statsFile,true));
       LOG.debug("Created file : " + statsFile);
       LOG.debug("Writing stats in it : " + statsMap);
-      Utilities.runtimeSerializationKryo.get().writeObject(output, statsMap);
+      Kryo kryo = SerializationUtilities.borrowKryo();
+      try {
+        kryo.writeObject(output, statsMap);
+      } finally {
+        SerializationUtilities.releaseKryo(kryo);
+      }
       output.close();
       return true;
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/test/org/apache/hadoop/hive/metastore/TestMetastoreExpr.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/TestMetastoreExpr.java b/ql/src/test/org/apache/hadoop/hive/metastore/TestMetastoreExpr.java
index d6d513d..5e53604 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/TestMetastoreExpr.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/TestMetastoreExpr.java
@@ -23,8 +23,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Stack;
 
-import junit.framework.TestCase;
-
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -36,7 +34,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -52,6 +50,8 @@ import org.apache.thrift.TException;
 
 import com.google.common.collect.Lists;
 
+import junit.framework.TestCase;
+
 /**
  * Tests hive metastore expression support. This should be moved in metastore module
  * as soon as we are able to use ql from metastore server (requires splitting metastore
@@ -166,8 +166,8 @@ public class TestMetastoreExpr extends TestCase {
   public void checkExpr(int numParts,
       String dbName, String tblName, ExprNodeGenericFuncDesc expr) throws Exception {
     List<Partition> parts = new ArrayList<Partition>();
-    client.listPartitionsByExpr(
-        dbName, tblName, Utilities.serializeExpressionToKryo(expr), null, (short)-1, parts);
+    client.listPartitionsByExpr(dbName, tblName,
+        SerializationUtilities.serializeExpressionToKryo(expr), null, (short)-1, parts);
     assertEquals("Partition check failed: " + expr.getExprString(), numParts, parts.size());
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java
index 1364888..c1667c2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java
@@ -23,8 +23,6 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 
-import junit.framework.TestCase;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -37,6 +35,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.mapred.JobConf;
 
+import junit.framework.TestCase;
+
 /**
  * TestPlan.
  *
@@ -83,7 +83,7 @@ public class TestPlan extends TestCase {
       JobConf job = new JobConf(TestPlan.class);
       // serialize the configuration once ..
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      Utilities.serializePlan(mrwork, baos, job);
+      SerializationUtilities.serializePlan(mrwork, baos, job);
       baos.close();
       String v1 = baos.toString();
 
@@ -101,7 +101,7 @@ public class TestPlan extends TestCase {
 
       // serialize again
       baos.reset();
-      Utilities.serializePlan(mrwork2, baos, job);
+      SerializationUtilities.serializePlan(mrwork2, baos, job);
       baos.close();
 
       // verify that the two are equal

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
index 028cdd1..bb6a4e1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
@@ -26,16 +26,8 @@ import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
-import junit.framework.Assert;
-import junit.framework.TestCase;
 
 import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -46,6 +38,14 @@ import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
 
 public class TestUtilities extends TestCase {
   public static final Logger LOG = LoggerFactory.getLogger(TestUtilities.class);
@@ -85,8 +85,8 @@ public class TestUtilities extends TestCase {
     children.add(constant);
     ExprNodeGenericFuncDesc desc = new ExprNodeGenericFuncDesc(TypeInfoFactory.timestampTypeInfo,
         new GenericUDFFromUtcTimestamp(), children);
-    assertEquals(desc.getExprString(), Utilities.deserializeExpression(
-        Utilities.serializeExpression(desc)).getExprString());
+    assertEquals(desc.getExprString(), SerializationUtilities.deserializeExpression(
+        SerializationUtilities.serializeExpression(desc)).getExprString());
   }
 
   public void testgetDbTableName() throws HiveException{

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 9f616ab..1ff7eb5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -17,7 +17,13 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -51,6 +57,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
@@ -67,7 +74,6 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy;
-import org.apache.hadoop.hive.ql.io.orc.TestOrcRawRecordMerger.MyRow;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -1609,7 +1615,7 @@ public class TestInputOutputFormat {
     Path mapXml = new Path(workDir, "map.xml");
     localFs.delete(mapXml, true);
     FSDataOutputStream planStream = localFs.create(mapXml);
-    Utilities.serializePlan(mapWork, planStream, conf);
+    SerializationUtilities.serializePlan(mapWork, planStream, conf);
     planStream.close();
     return conf;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
index 3560c43..7a93b54 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -121,7 +121,7 @@ public class TestOrcSplitElimination {
     childExpr.add(col);
     childExpr.add(con);
     ExprNodeGenericFuncDesc en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
-    String sargStr = Utilities.serializeExpression(en);
+    String sargStr = SerializationUtilities.serializeExpression(en);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     InputSplit[] splits = in.getSplits(conf, 1);
     assertEquals(5, splits.length);
@@ -129,7 +129,7 @@ public class TestOrcSplitElimination {
     con = new ExprNodeConstantDesc(1);
     childExpr.set(1, con);
     en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
-    sargStr = Utilities.serializeExpression(en);
+    sargStr = SerializationUtilities.serializeExpression(en);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     assertEquals(0, splits.length);
@@ -137,7 +137,7 @@ public class TestOrcSplitElimination {
     con = new ExprNodeConstantDesc(2);
     childExpr.set(1, con);
     en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
-    sargStr = Utilities.serializeExpression(en);
+    sargStr = SerializationUtilities.serializeExpression(en);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     assertEquals(1, splits.length);
@@ -145,7 +145,7 @@ public class TestOrcSplitElimination {
     con = new ExprNodeConstantDesc(5);
     childExpr.set(1, con);
     en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
-    sargStr = Utilities.serializeExpression(en);
+    sargStr = SerializationUtilities.serializeExpression(en);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     assertEquals(2, splits.length);
@@ -153,7 +153,7 @@ public class TestOrcSplitElimination {
     con = new ExprNodeConstantDesc(13);
     childExpr.set(1, con);
     en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
-    sargStr = Utilities.serializeExpression(en);
+    sargStr = SerializationUtilities.serializeExpression(en);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     assertEquals(3, splits.length);
@@ -161,7 +161,7 @@ public class TestOrcSplitElimination {
     con = new ExprNodeConstantDesc(29);
     childExpr.set(1, con);
     en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
-    sargStr = Utilities.serializeExpression(en);
+    sargStr = SerializationUtilities.serializeExpression(en);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     assertEquals(4, splits.length);
@@ -169,7 +169,7 @@ public class TestOrcSplitElimination {
     con = new ExprNodeConstantDesc(70);
     childExpr.set(1, con);
     en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
-    sargStr = Utilities.serializeExpression(en);
+    sargStr = SerializationUtilities.serializeExpression(en);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     assertEquals(5, splits.length);
@@ -199,7 +199,7 @@ public class TestOrcSplitElimination {
     childExpr.add(col);
     childExpr.add(con);
     ExprNodeGenericFuncDesc en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
-    String sargStr = Utilities.serializeExpression(en);
+    String sargStr = SerializationUtilities.serializeExpression(en);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     InputSplit[] splits = in.getSplits(conf, 1);
     assertEquals(2, splits.length);
@@ -207,7 +207,7 @@ public class TestOrcSplitElimination {
     con = new ExprNodeConstantDesc(0);
     childExpr.set(1, con);
     en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
-    sargStr = Utilities.serializeExpression(en);
+    sargStr = SerializationUtilities.serializeExpression(en);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     // no stripes satisfies the condition
@@ -216,7 +216,7 @@ public class TestOrcSplitElimination {
     con = new ExprNodeConstantDesc(2);
     childExpr.set(1, con);
     en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
-    sargStr = Utilities.serializeExpression(en);
+    sargStr = SerializationUtilities.serializeExpression(en);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     // only first stripe will satisfy condition and hence single split
@@ -225,7 +225,7 @@ public class TestOrcSplitElimination {
     con = new ExprNodeConstantDesc(5);
     childExpr.set(1, con);
     en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
-    sargStr = Utilities.serializeExpression(en);
+    sargStr = SerializationUtilities.serializeExpression(en);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     // first stripe will satisfy the predicate and will be a single split, last stripe will be a
@@ -235,7 +235,7 @@ public class TestOrcSplitElimination {
     con = new ExprNodeConstantDesc(13);
     childExpr.set(1, con);
     en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
-    sargStr = Utilities.serializeExpression(en);
+    sargStr = SerializationUtilities.serializeExpression(en);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     // first 2 stripes will satisfy the predicate and merged to single split, last stripe will be a
@@ -245,7 +245,7 @@ public class TestOrcSplitElimination {
     con = new ExprNodeConstantDesc(29);
     childExpr.set(1, con);
     en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
-    sargStr = Utilities.serializeExpression(en);
+    sargStr = SerializationUtilities.serializeExpression(en);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     // first 3 stripes will satisfy the predicate and merged to single split, last stripe will be a
@@ -255,7 +255,7 @@ public class TestOrcSplitElimination {
     con = new ExprNodeConstantDesc(70);
     childExpr.set(1, con);
     en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
-    sargStr = Utilities.serializeExpression(en);
+    sargStr = SerializationUtilities.serializeExpression(en);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     // first 2 stripes will satisfy the predicate and merged to single split, last two stripe will
@@ -304,7 +304,7 @@ public class TestOrcSplitElimination {
     childExpr2.add(en1);
     ExprNodeGenericFuncDesc en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2);
 
-    String sargStr = Utilities.serializeExpression(en2);
+    String sargStr = SerializationUtilities.serializeExpression(en2);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     InputSplit[] splits = in.getSplits(conf, 1);
     assertEquals(2, splits.length);
@@ -321,7 +321,7 @@ public class TestOrcSplitElimination {
     childExpr2.set(1, en1);
     en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2);
 
-    sargStr = Utilities.serializeExpression(en2);
+    sargStr = SerializationUtilities.serializeExpression(en2);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     // no stripe will satisfy the predicate
@@ -339,7 +339,7 @@ public class TestOrcSplitElimination {
     childExpr2.set(1, en1);
     en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2);
 
-    sargStr = Utilities.serializeExpression(en2);
+    sargStr = SerializationUtilities.serializeExpression(en2);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     // only first stripe will satisfy condition and hence single split
@@ -358,7 +358,7 @@ public class TestOrcSplitElimination {
     childExpr2.set(1, en1);
     en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2);
 
-    sargStr = Utilities.serializeExpression(en2);
+    sargStr = SerializationUtilities.serializeExpression(en2);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     // first two stripes will satisfy condition and hence single split
@@ -378,7 +378,7 @@ public class TestOrcSplitElimination {
     childExpr2.set(1, en1);
     en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2);
 
-    sargStr = Utilities.serializeExpression(en2);
+    sargStr = SerializationUtilities.serializeExpression(en2);
     conf.set("hive.io.filter.expr.serialized", sargStr);
     splits = in.getSplits(conf, 1);
     // only second stripes will satisfy condition and hence single split

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
index 7204521..bf363f3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
@@ -18,12 +18,19 @@
 
 package org.apache.hadoop.hive.ql.io.parquet;
 
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
-import org.apache.hadoop.hive.ql.plan.*;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -34,16 +41,14 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import com.google.common.collect.Lists;
 
 public class TestParquetRowGroupFilter extends AbstractTestParquetDirect {
 
@@ -96,7 +101,7 @@ public class TestParquetRowGroupFilter extends AbstractTestParquetDirect {
     children.add(columnDesc);
     children.add(constantDesc);
     ExprNodeGenericFuncDesc genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children);
-    String searchArgumentStr = Utilities.serializeExpression(genericFuncDesc);
+    String searchArgumentStr = SerializationUtilities.serializeExpression(genericFuncDesc);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr);
 
     ParquetRecordReaderWrapper recordReader = (ParquetRecordReaderWrapper)
@@ -109,7 +114,7 @@ public class TestParquetRowGroupFilter extends AbstractTestParquetDirect {
     constantDesc = new ExprNodeConstantDesc(100);
     children.set(1, constantDesc);
     genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children);
-    searchArgumentStr = Utilities.serializeExpression(genericFuncDesc);
+    searchArgumentStr = SerializationUtilities.serializeExpression(genericFuncDesc);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr);
 
     recordReader = (ParquetRecordReaderWrapper)

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
index e72789d..a0fa700 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
@@ -22,23 +22,22 @@ import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertNull;
 import static junit.framework.Assert.assertTrue;
 
-import com.google.common.collect.Sets;
+import java.beans.XMLDecoder;
+import java.io.ByteArrayInputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Set;
 
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
 import org.junit.Test;
 
-import java.beans.XMLDecoder;
-import java.io.ByteArrayInputStream;
-import java.io.UnsupportedEncodingException;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.parquet.filter2.predicate.FilterPredicate;
+import com.google.common.collect.Sets;
 
 /**
  * These tests cover the conversion from Hive's AST to SearchArguments.
@@ -2713,7 +2712,7 @@ public class TestConvertAstToSearchArg {
           "AAABgj0BRVFVQcwBBW9yZy5hcGFjaGUuaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5Q" +
           "EAAAECAQFib29sZWHu";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());
@@ -2732,7 +2731,7 @@ public class TestConvertAstToSearchArg {
             "Y2hlLmhhZG9vcC5oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUH" +
             "MAQVvcmcuYXBhY2hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAgEBYm9vbGVh7g==";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());
@@ -2752,7 +2751,7 @@ public class TestConvertAstToSearchArg {
             "oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQZvcmcuYXBhY2" +
             "hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABBAEBYm9vbGVh7g==";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());
@@ -2772,7 +2771,7 @@ public class TestConvertAstToSearchArg {
             "vb3AuaGl2ZS5xbC51ZGYuZ2VuZXJpYy5HZW5lcmljVURGT1BFcXVh7AEAAAGCPQFFUVVBzAEGb3JnLm" +
             "FwYWNoZS5oYWRvb3AuaW8uQm9vbGVhbldyaXRhYmzlAQAAAQQBAWJvb2xlYe4=";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());
@@ -2792,7 +2791,7 @@ public class TestConvertAstToSearchArg {
             "lLmhhZG9vcC5oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQ" +
             "ZvcmcuYXBhY2hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABBAEBYm9vbGVh7g==";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());
@@ -2811,7 +2810,7 @@ public class TestConvertAstToSearchArg {
             "dmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QRXF1YewBAAABgj0BRVFVQcwBBW9yZy5hcGFjaGU" +
             "uaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5QEAAAECAQFib29sZWHu";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());
@@ -2832,7 +2831,7 @@ public class TestConvertAstToSearchArg {
             "hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAwkBAgEBYrIAAAgBAwkBB29yZy5hcGFjaGUua" +
             "GFkb29wLmhpdmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QQW7kAQEGAQAAAQMJ";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("(and leaf-0 leaf-1)", sarg.getExpression().toString());
     assertEquals(2, sarg.getLeaves().size());
@@ -2854,7 +2853,7 @@ public class TestConvertAstToSearchArg {
             "aXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQVvcmcuYXBhY2h" +
             "lLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAgEBYm9vbGVh7g==";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());
@@ -2873,7 +2872,7 @@ public class TestConvertAstToSearchArg {
             "b29wLmhpdmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QRXF1YewBAAABgj0BRVFVQcwBBW9yZy5" +
             "hcGFjaGUuaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5QEAAAECAQFib29sZWHu";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());


[2/2] hive git commit: HIVE-12302: Use KryoPool instead of thread-local caching (Prasanth Jayachandran reviewed by Ashutosh Chauhan)

Posted by pr...@apache.org.
HIVE-12302: Use KryoPool instead of thread-local caching (Prasanth Jayachandran reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2bb5e63c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2bb5e63c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2bb5e63c

Branch: refs/heads/master
Commit: 2bb5e63c96c58d81d4565e9633accbd5133b33af
Parents: 560e4fe
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Tue Dec 8 13:07:09 2015 -0600
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Tue Dec 8 13:07:09 2015 -0600

----------------------------------------------------------------------
 .../predicate/AccumuloPredicateHandler.java     |   4 +-
 .../predicate/TestAccumuloPredicateHandler.java |  36 +-
 .../hive/hbase/HiveHBaseTableInputFormat.java   |  11 +-
 .../hive/hcatalog/api/HCatClientHMSImpl.java    |  26 +-
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |   3 +-
 .../hadoop/hive/ql/exec/MapJoinOperator.java    |   6 +-
 .../apache/hadoop/hive/ql/exec/PTFUtils.java    |   2 +-
 .../hive/ql/exec/SerializationUtilities.java    | 730 +++++++++++++++++++
 .../apache/hadoop/hive/ql/exec/Utilities.java   | 695 +-----------------
 .../hadoop/hive/ql/exec/mr/ExecDriver.java      |   6 +-
 .../hadoop/hive/ql/exec/mr/MapRedTask.java      |   3 +-
 .../hadoop/hive/ql/exec/mr/MapredLocalTask.java |   3 +-
 .../persistence/HybridHashTableContainer.java   |  29 +-
 .../ql/exec/persistence/ObjectContainer.java    |  40 +-
 .../hive/ql/exec/spark/KryoSerializer.java      |  28 +-
 .../ql/exec/spark/RemoteHiveSparkClient.java    |   1 -
 .../hadoop/hive/ql/io/HiveInputFormat.java      |   5 +-
 .../hive/ql/io/parquet/ProjectionPusher.java    |   3 +-
 .../hive/ql/io/sarg/ConvertAstToSearchArg.java  |   8 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  61 +-
 .../physical/CommonJoinTaskDispatcher.java      |   3 +-
 .../physical/GenMRSkewJoinProcessor.java        |  17 +-
 .../physical/GenSparkSkewJoinProcessor.java     |  19 +-
 .../ql/optimizer/physical/SerializeFilter.java  |  24 +-
 .../physical/SortMergeJoinTaskDispatcher.java   |   5 +-
 .../ppr/PartitionExpressionForMetastore.java    |   8 +-
 .../optimizer/spark/SplitSparkWorkResolver.java |   4 +-
 .../hadoop/hive/ql/parse/GenTezUtils.java       |  11 +-
 .../hive/ql/parse/spark/GenSparkUtils.java      |  10 +-
 .../hive/ql/parse/spark/SplitOpTreeForDPP.java  |   7 +-
 .../hive/ql/stats/fs/FSStatsAggregator.java     |  14 +-
 .../hive/ql/stats/fs/FSStatsPublisher.java      |  14 +-
 .../hive/metastore/TestMetastoreExpr.java       |  10 +-
 .../apache/hadoop/hive/ql/exec/TestPlan.java    |   8 +-
 .../hadoop/hive/ql/exec/TestUtilities.java      |  20 +-
 .../hive/ql/io/orc/TestInputOutputFormat.java   |  12 +-
 .../hive/ql/io/orc/TestOrcSplitElimination.java |  40 +-
 .../io/parquet/TestParquetRowGroupFilter.java   |  27 +-
 .../ql/io/sarg/TestConvertAstToSearchArg.java   |  35 +-
 39 files changed, 1054 insertions(+), 934 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
index 2c0e3c2..d5cc9a5 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.hive.accumulo.predicate.compare.NotEqual;
 import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
 import org.apache.hadoop.hive.accumulo.predicate.compare.StringCompare;
 import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
 import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
@@ -358,7 +358,7 @@ public class AccumuloPredicateHandler {
     if (filteredExprSerialized == null)
       return null;
 
-    return Utilities.deserializeExpression(filteredExprSerialized);
+    return SerializationUtilities.deserializeExpression(filteredExprSerialized);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
index 15ccda7..88e4530 100644
--- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.hive.accumulo.predicate.compare.StringCompare;
 import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
 import org.apache.hadoop.hive.accumulo.serde.TooManyAccumuloColumnsException;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -117,7 +117,7 @@ public class TestAccumuloPredicateHandler {
     ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPEqual(), children);
     assertNotNull(node);
-    String filterExpr = Utilities.serializeExpression(node);
+    String filterExpr = SerializationUtilities.serializeExpression(node);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
 
     List<IndexSearchCondition> sConditions = handler.getSearchConditions(conf);
@@ -134,7 +134,7 @@ public class TestAccumuloPredicateHandler {
     ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPEqual(), children);
     assertNotNull(node);
-    String filterExpr = Utilities.serializeExpression(node);
+    String filterExpr = SerializationUtilities.serializeExpression(node);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
 
     Collection<Range> ranges = handler.getRanges(conf, columnMapper);
@@ -157,7 +157,7 @@ public class TestAccumuloPredicateHandler {
     ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPGreaterThan(), children);
     assertNotNull(node);
-    String filterExpr = Utilities.serializeExpression(node);
+    String filterExpr = SerializationUtilities.serializeExpression(node);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
 
     Collection<Range> ranges = handler.getRanges(conf, columnMapper);
@@ -182,7 +182,7 @@ public class TestAccumuloPredicateHandler {
     ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPEqualOrGreaterThan(), children);
     assertNotNull(node);
-    String filterExpr = Utilities.serializeExpression(node);
+    String filterExpr = SerializationUtilities.serializeExpression(node);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
 
     Collection<Range> ranges = handler.getRanges(conf, columnMapper);
@@ -206,7 +206,7 @@ public class TestAccumuloPredicateHandler {
     ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPLessThan(), children);
     assertNotNull(node);
-    String filterExpr = Utilities.serializeExpression(node);
+    String filterExpr = SerializationUtilities.serializeExpression(node);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
 
     Collection<Range> ranges = handler.getRanges(conf, columnMapper);
@@ -231,7 +231,7 @@ public class TestAccumuloPredicateHandler {
     ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPEqualOrLessThan(), children);
     assertNotNull(node);
-    String filterExpr = Utilities.serializeExpression(node);
+    String filterExpr = SerializationUtilities.serializeExpression(node);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
 
     Collection<Range> ranges = handler.getRanges(conf, columnMapper);
@@ -273,7 +273,7 @@ public class TestAccumuloPredicateHandler {
     ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPAnd(), bothFilters);
 
-    String filterExpr = Utilities.serializeExpression(both);
+    String filterExpr = SerializationUtilities.serializeExpression(both);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
 
     Collection<Range> ranges = handler.getRanges(conf, columnMapper);
@@ -309,7 +309,7 @@ public class TestAccumuloPredicateHandler {
     ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPAnd(), bothFilters);
 
-    String filterExpr = Utilities.serializeExpression(both);
+    String filterExpr = SerializationUtilities.serializeExpression(both);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
 
     List<Range> ranges = handler.getRanges(conf, columnMapper);
@@ -329,7 +329,7 @@ public class TestAccumuloPredicateHandler {
     ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPEqual(), children);
     assertNotNull(node);
-    String filterExpr = Utilities.serializeExpression(node);
+    String filterExpr = SerializationUtilities.serializeExpression(node);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
 
     List<IndexSearchCondition> sConditions = handler.getSearchConditions(conf);
@@ -356,7 +356,7 @@ public class TestAccumuloPredicateHandler {
     ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPEqual(), children);
     assertNotNull(node);
-    String filterExpr = Utilities.serializeExpression(node);
+    String filterExpr = SerializationUtilities.serializeExpression(node);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
     List<IndexSearchCondition> sConditions = handler.getSearchConditions(conf);
     assertEquals(sConditions.size(), 1);
@@ -375,7 +375,7 @@ public class TestAccumuloPredicateHandler {
       ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
           new GenericUDFOPNotNull(), children);
       assertNotNull(node);
-      String filterExpr = Utilities.serializeExpression(node);
+      String filterExpr = SerializationUtilities.serializeExpression(node);
       conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
       List<IndexSearchCondition> sConditions = handler.getSearchConditions(conf);
       assertEquals(sConditions.size(), 1);
@@ -417,7 +417,7 @@ public class TestAccumuloPredicateHandler {
     ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPAnd(), bothFilters);
 
-    String filterExpr = Utilities.serializeExpression(both);
+    String filterExpr = SerializationUtilities.serializeExpression(both);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
     try {
       List<IteratorSetting> iterators = handler.getIterators(conf, columnMapper);
@@ -468,7 +468,7 @@ public class TestAccumuloPredicateHandler {
     ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPAnd(), bothFilters);
 
-    String filterExpr = Utilities.serializeExpression(both);
+    String filterExpr = SerializationUtilities.serializeExpression(both);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
     conf.setBoolean(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY, false);
     try {
@@ -519,7 +519,7 @@ public class TestAccumuloPredicateHandler {
     ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPAnd(), bothFilters);
 
-    String filterExpr = Utilities.serializeExpression(both);
+    String filterExpr = SerializationUtilities.serializeExpression(both);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
     List<IteratorSetting> iterators = handler.getIterators(conf, columnMapper);
     assertEquals(iterators.size(), 2);
@@ -665,7 +665,7 @@ public class TestAccumuloPredicateHandler {
     ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPAnd(), bothFilters);
 
-    String filterExpr = Utilities.serializeExpression(both);
+    String filterExpr = SerializationUtilities.serializeExpression(both);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
 
     // Should make ['f', 'm\0')
@@ -697,7 +697,7 @@ public class TestAccumuloPredicateHandler {
         new GenericUDFOPLessThan(), children);
     assertNotNull(node);
 
-    String filterExpr = Utilities.serializeExpression(node);
+    String filterExpr = SerializationUtilities.serializeExpression(node);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
 
     // Should make (100, +inf)
@@ -738,7 +738,7 @@ public class TestAccumuloPredicateHandler {
         new GenericUDFOPLessThan(), children);
     assertNotNull(node);
 
-    String filterExpr = Utilities.serializeExpression(node);
+    String filterExpr = SerializationUtilities.serializeExpression(node);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
 
     // Should make (100, +inf)

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
index b17714f..88d1865 100644
--- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
+++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
@@ -20,12 +20,9 @@ package org.apache.hadoop.hive.hbase;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -39,7 +36,7 @@ import org.apache.hadoop.hbase.mapreduce.TableSplit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
 import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
 import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -74,6 +71,8 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * HiveHBaseTableInputFormat implements InputFormat for HBase storage handler
@@ -187,7 +186,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
     Scan scan = new Scan();
     String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR);
     if (filterObjectSerialized != null) {
-      HBaseScanRange range = Utilities.deserializeObject(filterObjectSerialized,
+      HBaseScanRange range = SerializationUtilities.deserializeObject(filterObjectSerialized,
           HBaseScanRange.class);
       try {
         range.setup(scan, jobConf);
@@ -203,7 +202,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
     }
 
     ExprNodeGenericFuncDesc filterExpr =
-      Utilities.deserializeExpression(filterExprSerialized);
+        SerializationUtilities.deserializeExpression(filterExprSerialized);
 
     String keyColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey];
     String colType = jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey];

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
index 41571fc..4ab497e 100644
--- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
+++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
@@ -18,9 +18,15 @@
  */
 package org.apache.hive.hcatalog.api;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.ObjectPair;
@@ -47,7 +53,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -68,13 +74,9 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  * The HCatClientHMSImpl is the Hive Metastore client based implementation of
@@ -584,7 +586,7 @@ public class HCatClientHMSImpl extends HCatClient {
     ExprNodeGenericFuncDesc partitionExpression = new ExpressionBuilder(table, partitionSpec).build();
     ObjectPair<Integer, byte[]> serializedPartitionExpression =
         new ObjectPair<Integer, byte[]>(partitionSpec.size(),
-            Utilities.serializeExpressionToKryo(partitionExpression));
+            SerializationUtilities.serializeExpressionToKryo(partitionExpression));
     hmsClient.dropPartitions(table.getDbName(), table.getTableName(), Arrays.asList(serializedPartitionExpression),
         deleteData && !isExternal(table),  // Delete data?
         ifExists,                          // Fail if table doesn't exist?

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 6713a2f..5b2c8c2 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -57,6 +57,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -1352,7 +1353,7 @@ public class QTestUtil {
     try {
       conf.set(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "javaXML");
       for (Task<? extends Serializable> plan : tasks) {
-        Utilities.serializePlan(plan, ofs, conf);
+        SerializationUtilities.serializePlan(plan, ofs, conf);
       }
       ofs.close();
       fixXml4JDK7(outf.getPath());

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index cab0fc8..fbc5ea4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -21,16 +21,12 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -68,6 +64,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hive.common.util.ReflectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Map side Join operator implementation.

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
index 6c11637..721fbaa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
@@ -40,7 +40,7 @@ import java.util.Stack;
 import org.antlr.runtime.CommonToken;
 import org.antlr.runtime.tree.BaseTree;
 import org.antlr.runtime.tree.CommonTree;
-import org.apache.hadoop.hive.ql.exec.Utilities.EnumDelegate;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities.EnumDelegate;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
new file mode 100644
index 0000000..d5e946e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -0,0 +1,730 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import java.beans.DefaultPersistenceDelegate;
+import java.beans.Encoder;
+import java.beans.ExceptionListener;
+import java.beans.Expression;
+import java.beans.PersistenceDelegate;
+import java.beans.Statement;
+import java.beans.XMLDecoder;
+import java.beans.XMLEncoder;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.antlr.runtime.CommonToken;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantMapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantStructObjectInspector;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.pool.KryoFactory;
+import com.esotericsoftware.kryo.pool.KryoPool;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+
+/**
+ * Utilities related to serialization and deserialization.
+ */
+public class SerializationUtilities {
+  private static final String CLASS_NAME = SerializationUtilities.class.getName();
+  private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+
+  private static KryoFactory factory = new KryoFactory() {
+    public Kryo create() {
+      Kryo kryo = new Kryo();
+      kryo.register(java.sql.Date.class, new SqlDateSerializer());
+      kryo.register(java.sql.Timestamp.class, new TimestampSerializer());
+      kryo.register(Path.class, new PathSerializer());
+      kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
+      ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
+          .setFallbackInstantiatorStrategy(
+              new StdInstantiatorStrategy());
+      removeField(kryo, Operator.class, "colExprMap");
+      removeField(kryo, AbstractOperatorDesc.class, "statistics");
+      kryo.register(MapWork.class);
+      kryo.register(ReduceWork.class);
+      kryo.register(TableDesc.class);
+      kryo.register(UnionOperator.class);
+      kryo.register(FileSinkOperator.class);
+      kryo.register(HiveIgnoreKeyTextOutputFormat.class);
+      kryo.register(StandardConstantListObjectInspector.class);
+      kryo.register(StandardConstantMapObjectInspector.class);
+      kryo.register(StandardConstantStructObjectInspector.class);
+      kryo.register(SequenceFileInputFormat.class);
+      kryo.register(HiveSequenceFileOutputFormat.class);
+      kryo.register(SparkEdgeProperty.class);
+      kryo.register(SparkWork.class);
+      kryo.register(Pair.class);
+      return kryo;
+    }
+  };
+
+  // Bounded queue could be specified here but that will lead to blocking.
+  // ConcurrentLinkedQueue is unbounded and will release soft referenced kryo instances under
+  // memory pressure.
+  private static KryoPool kryoPool = new KryoPool.Builder(factory).softReferences().build();
+
+  /**
+   * By default, kryo pool uses ConcurrentLinkedQueue which is unbounded. To facilitate reuse of
+   * kryo object call releaseKryo() after done using the kryo instance. The class loader for the
+   * kryo instance will be set to current thread's context class loader.
+   *
+   * @return kryo instance
+   */
+  public static Kryo borrowKryo() {
+    Kryo kryo = kryoPool.borrow();
+    kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+    return kryo;
+  }
+
+  /**
+   * Release kryo instance back to the pool.
+   *
+   * @param kryo - kryo instance to be released
+   */
+  public static void releaseKryo(Kryo kryo) {
+    kryoPool.release(kryo);
+  }
+
+  private static void removeField(Kryo kryo, Class type, String fieldName) {
+    FieldSerializer fld = new FieldSerializer(kryo, type);
+    fld.removeField(fieldName);
+    kryo.register(type, fld);
+  }
+
+  /**
+   * Kryo serializer for timestamp.
+   */
+  private static class TimestampSerializer extends
+      com.esotericsoftware.kryo.Serializer<Timestamp> {
+
+    @Override
+    public Timestamp read(Kryo kryo, Input input, Class<Timestamp> clazz) {
+      Timestamp ts = new Timestamp(input.readLong());
+      ts.setNanos(input.readInt());
+      return ts;
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, Timestamp ts) {
+      output.writeLong(ts.getTime());
+      output.writeInt(ts.getNanos());
+    }
+  }
+
+  /**
+   * Custom Kryo serializer for sql date, otherwise Kryo gets confused between
+   * java.sql.Date and java.util.Date while deserializing
+   */
+  private static class SqlDateSerializer extends
+      com.esotericsoftware.kryo.Serializer<java.sql.Date> {
+
+    @Override
+    public java.sql.Date read(Kryo kryo, Input input, Class<java.sql.Date> clazz) {
+      return new java.sql.Date(input.readLong());
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, java.sql.Date sqlDate) {
+      output.writeLong(sqlDate.getTime());
+    }
+  }
+
+  private static class PathSerializer extends com.esotericsoftware.kryo.Serializer<Path> {
+
+    @Override
+    public void write(Kryo kryo, Output output, Path path) {
+      output.writeString(path.toUri().toString());
+    }
+
+    @Override
+    public Path read(Kryo kryo, Input input, Class<Path> type) {
+      return new Path(URI.create(input.readString()));
+    }
+  }
+
+  /**
+   * A kryo {@link Serializer} for lists created via {@link Arrays#asList(Object...)}.
+   * <p>
+   * Note: This serializer does not support cyclic references, so if one of the objects
+   * gets set the list as attribute this might cause an error during deserialization.
+   * </p>
+   * <p/>
+   * This is from kryo-serializers package. Added explicitly to avoid classpath issues.
+   */
+  private static class ArraysAsListSerializer
+      extends com.esotericsoftware.kryo.Serializer<List<?>> {
+
+    private Field _arrayField;
+
+    public ArraysAsListSerializer() {
+      try {
+        _arrayField = Class.forName("java.util.Arrays$ArrayList").getDeclaredField("a");
+        _arrayField.setAccessible(true);
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
+      // Immutable causes #copy(obj) to return the original object
+      setImmutable(true);
+    }
+
+    @Override
+    public List<?> read(final Kryo kryo, final Input input, final Class<List<?>> type) {
+      final int length = input.readInt(true);
+      Class<?> componentType = kryo.readClass(input).getType();
+      if (componentType.isPrimitive()) {
+        componentType = getPrimitiveWrapperClass(componentType);
+      }
+      try {
+        final Object items = Array.newInstance(componentType, length);
+        for (int i = 0; i < length; i++) {
+          Array.set(items, i, kryo.readClassAndObject(input));
+        }
+        return Arrays.asList((Object[]) items);
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void write(final Kryo kryo, final Output output, final List<?> obj) {
+      try {
+        final Object[] array = (Object[]) _arrayField.get(obj);
+        output.writeInt(array.length, true);
+        final Class<?> componentType = array.getClass().getComponentType();
+        kryo.writeClass(output, componentType);
+        for (final Object item : array) {
+          kryo.writeClassAndObject(output, item);
+        }
+      } catch (final RuntimeException e) {
+        // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write...
+        // handles SerializationException specifically (resizing the buffer)...
+        throw e;
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    private Class<?> getPrimitiveWrapperClass(final Class<?> c) {
+      if (c.isPrimitive()) {
+        if (c.equals(Long.TYPE)) {
+          return Long.class;
+        } else if (c.equals(Integer.TYPE)) {
+          return Integer.class;
+        } else if (c.equals(Double.TYPE)) {
+          return Double.class;
+        } else if (c.equals(Float.TYPE)) {
+          return Float.class;
+        } else if (c.equals(Boolean.TYPE)) {
+          return Boolean.class;
+        } else if (c.equals(Character.TYPE)) {
+          return Character.class;
+        } else if (c.equals(Short.TYPE)) {
+          return Short.class;
+        } else if (c.equals(Byte.TYPE)) {
+          return Byte.class;
+        }
+      }
+      return c;
+    }
+  }
+
+
+  /**
+   * Serializes the plan.
+   *
+   * @param plan The plan, such as QueryPlan, MapredWork, etc.
+   * @param out  The stream to write to.
+   * @param conf to pick which serialization format is desired.
+   */
+  public static void serializePlan(Object plan, OutputStream out, Configuration conf) {
+    serializePlan(plan, out, conf, false);
+  }
+
+  public static void serializePlan(Kryo kryo, Object plan, OutputStream out, Configuration conf) {
+    serializePlan(kryo, plan, out, conf, false);
+  }
+
+  private static void serializePlan(Object plan, OutputStream out, Configuration conf,
+      boolean cloningPlan) {
+    Kryo kryo = borrowKryo();
+    try {
+      serializePlan(kryo, plan, out, conf, cloningPlan);
+    } finally {
+      releaseKryo(kryo);
+    }
+  }
+
+  private static void serializePlan(Kryo kryo, Object plan, OutputStream out, Configuration conf,
+      boolean cloningPlan) {
+    PerfLogger perfLogger = SessionState.getPerfLogger();
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
+    String serializationType = conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo");
+    LOG.info("Serializing " + plan.getClass().getSimpleName() + " via " + serializationType);
+    if ("javaXML".equalsIgnoreCase(serializationType)) {
+      serializeObjectByJavaXML(plan, out);
+    } else {
+      if (cloningPlan) {
+        serializeObjectByKryo(kryo, plan, out);
+      } else {
+        serializeObjectByKryo(kryo, plan, out);
+      }
+    }
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
+  }
+
+  /**
+   * Deserializes the plan.
+   *
+   * @param in        The stream to read from.
+   * @param planClass class of plan
+   * @param conf      configuration
+   * @return The plan, such as QueryPlan, MapredWork, etc.
+   */
+  public static <T> T deserializePlan(InputStream in, Class<T> planClass, Configuration conf) {
+    return deserializePlan(in, planClass, conf, false);
+  }
+
+  public static <T> T deserializePlan(Kryo kryo, InputStream in, Class<T> planClass,
+      Configuration conf) {
+    return deserializePlan(kryo, in, planClass, conf, false);
+  }
+
+  private static <T> T deserializePlan(InputStream in, Class<T> planClass, Configuration conf,
+      boolean cloningPlan) {
+    Kryo kryo = borrowKryo();
+    T result = null;
+    try {
+      result = deserializePlan(kryo, in, planClass, conf, cloningPlan);
+    } finally {
+      releaseKryo(kryo);
+    }
+    return result;
+  }
+
+  private static <T> T deserializePlan(Kryo kryo, InputStream in, Class<T> planClass,
+      Configuration conf, boolean cloningPlan) {
+    PerfLogger perfLogger = SessionState.getPerfLogger();
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);
+    T plan;
+    String serializationType = conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo");
+    LOG.info("Deserializing " + planClass.getSimpleName() + " via " + serializationType);
+    if ("javaXML".equalsIgnoreCase(serializationType)) {
+      plan = deserializeObjectByJavaXML(in);
+    } else {
+      if (cloningPlan) {
+        plan = deserializeObjectByKryo(kryo, in, planClass);
+      } else {
+        plan = deserializeObjectByKryo(kryo, in, planClass);
+      }
+    }
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);
+    return plan;
+  }
+
+  /**
+   * Clones using the powers of XML. Do not use unless necessary.
+   * @param plan The plan.
+   * @return The clone.
+   */
+  public static MapredWork clonePlan(MapredWork plan) {
+    // TODO: need proper clone. Meanwhile, let's at least keep this horror in one place
+    PerfLogger perfLogger = SessionState.getPerfLogger();
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+    Configuration conf = new HiveConf();
+    serializePlan(plan, baos, conf, true);
+    MapredWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+        MapredWork.class, conf, true);
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN);
+    return newPlan;
+  }
+
+  /**
+   * Clones using the powers of XML. Do not use unless necessary.
+   * @param plan The plan.
+   * @return The clone.
+   */
+  public static BaseWork cloneBaseWork(BaseWork plan) {
+    PerfLogger perfLogger = SessionState.getPerfLogger();
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+    Configuration conf = new HiveConf();
+    serializePlan(plan, baos, conf, true);
+    BaseWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+        plan.getClass(), conf, true);
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN);
+    return newPlan;
+  }
+
+  /**
+   * Serialize the object. This helper function mainly makes sure that enums,
+   * counters, etc are handled properly.
+   */
+  private static void serializeObjectByJavaXML(Object plan, OutputStream out) {
+    XMLEncoder e = new XMLEncoder(out);
+    e.setExceptionListener(new ExceptionListener() {
+      @Override
+      public void exceptionThrown(Exception e) {
+        LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e));
+        throw new RuntimeException("Cannot serialize object", e);
+      }
+    });
+    // workaround for java 1.5
+    e.setPersistenceDelegate(PlanUtils.ExpressionTypes.class, new EnumDelegate());
+    e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
+    e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
+    e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
+
+    e.setPersistenceDelegate(org.datanucleus.store.types.backed.Map.class, new MapDelegate());
+    e.setPersistenceDelegate(org.datanucleus.store.types.backed.List.class, new ListDelegate());
+    e.setPersistenceDelegate(CommonToken.class, new CommonTokenDelegate());
+    e.setPersistenceDelegate(Path.class, new PathDelegate());
+
+    e.writeObject(plan);
+    e.close();
+  }
+
+
+  /**
+   * Java 1.5 workaround. From http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5015403
+   */
+  public static class EnumDelegate extends DefaultPersistenceDelegate {
+    @Override
+    protected Expression instantiate(Object oldInstance, Encoder out) {
+      return new Expression(Enum.class, "valueOf", new Object[] {oldInstance.getClass(),
+          ((Enum<?>) oldInstance).name()});
+    }
+
+    @Override
+    protected boolean mutatesTo(Object oldInstance, Object newInstance) {
+      return oldInstance == newInstance;
+    }
+  }
+
+  public static class MapDelegate extends DefaultPersistenceDelegate {
+    @Override
+    protected Expression instantiate(Object oldInstance, Encoder out) {
+      Map oldMap = (Map) oldInstance;
+      HashMap newMap = new HashMap(oldMap);
+      return new Expression(newMap, HashMap.class, "new", new Object[] {});
+    }
+
+    @Override
+    protected boolean mutatesTo(Object oldInstance, Object newInstance) {
+      return false;
+    }
+
+    @Override
+    protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
+      java.util.Collection oldO = (java.util.Collection) oldInstance;
+      java.util.Collection newO = (java.util.Collection) newInstance;
+
+      if (newO.size() != 0) {
+        out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
+      }
+      for (Iterator i = oldO.iterator(); i.hasNext();) {
+        out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
+      }
+    }
+  }
+
+  public static class SetDelegate extends DefaultPersistenceDelegate {
+    @Override
+    protected Expression instantiate(Object oldInstance, Encoder out) {
+      Set oldSet = (Set) oldInstance;
+      HashSet newSet = new HashSet(oldSet);
+      return new Expression(newSet, HashSet.class, "new", new Object[] {});
+    }
+
+    @Override
+    protected boolean mutatesTo(Object oldInstance, Object newInstance) {
+      return false;
+    }
+
+    @Override
+    protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
+      java.util.Collection oldO = (java.util.Collection) oldInstance;
+      java.util.Collection newO = (java.util.Collection) newInstance;
+
+      if (newO.size() != 0) {
+        out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
+      }
+      for (Iterator i = oldO.iterator(); i.hasNext();) {
+        out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
+      }
+    }
+
+  }
+
+  public static class ListDelegate extends DefaultPersistenceDelegate {
+    @Override
+    protected Expression instantiate(Object oldInstance, Encoder out) {
+      List oldList = (List) oldInstance;
+      ArrayList newList = new ArrayList(oldList);
+      return new Expression(newList, ArrayList.class, "new", new Object[] {});
+    }
+
+    @Override
+    protected boolean mutatesTo(Object oldInstance, Object newInstance) {
+      return false;
+    }
+
+    @Override
+    protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
+      java.util.Collection oldO = (java.util.Collection) oldInstance;
+      java.util.Collection newO = (java.util.Collection) newInstance;
+
+      if (newO.size() != 0) {
+        out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
+      }
+      for (Iterator i = oldO.iterator(); i.hasNext();) {
+        out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
+      }
+    }
+
+  }
+
+  /**
+   * DatePersistenceDelegate. Needed to serialize java.util.Date
+   * since it is not serialization friendly.
+   * Also works for java.sql.Date since it derives from java.util.Date.
+   */
+  public static class DatePersistenceDelegate extends PersistenceDelegate {
+
+    @Override
+    protected Expression instantiate(Object oldInstance, Encoder out) {
+      Date dateVal = (Date)oldInstance;
+      Object[] args = { dateVal.getTime() };
+      return new Expression(dateVal, dateVal.getClass(), "new", args);
+    }
+
+    @Override
+    protected boolean mutatesTo(Object oldInstance, Object newInstance) {
+      if (oldInstance == null || newInstance == null) {
+        return false;
+      }
+      return oldInstance.getClass() == newInstance.getClass();
+    }
+  }
+
+  /**
+   * TimestampPersistenceDelegate. Needed to serialize java.sql.Timestamp since
+   * it is not serialization friendly.
+   */
+  public static class TimestampPersistenceDelegate extends DatePersistenceDelegate {
+    @Override
+    protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
+      Timestamp ts = (Timestamp)oldInstance;
+      Object[] args = { ts.getNanos() };
+      Statement stmt = new Statement(oldInstance, "setNanos", args);
+      out.writeStatement(stmt);
+    }
+  }
+
+  /**
+   * Need to serialize org.antlr.runtime.CommonToken
+   */
+  public static class CommonTokenDelegate extends PersistenceDelegate {
+    @Override
+    protected Expression instantiate(Object oldInstance, Encoder out) {
+      CommonToken ct = (CommonToken)oldInstance;
+      Object[] args = {ct.getType(), ct.getText()};
+      return new Expression(ct, ct.getClass(), "new", args);
+    }
+  }
+
+  public static class PathDelegate extends PersistenceDelegate {
+    @Override
+    protected Expression instantiate(Object oldInstance, Encoder out) {
+      Path p = (Path)oldInstance;
+      Object[] args = {p.toString()};
+      return new Expression(p, p.getClass(), "new", args);
+    }
+  }
+
+  /**
+   * @param plan Usually of type MapredWork, MapredLocalWork etc.
+   * @param out stream in which serialized plan is written into
+   */
+  private static void serializeObjectByKryo(Kryo kryo, Object plan, OutputStream out) {
+    Output output = new Output(out);
+    kryo.setClassLoader(Utilities.getSessionSpecifiedClassLoader());
+    kryo.writeObject(output, plan);
+    output.close();
+  }
+
+  /**
+   * De-serialize an object. This helper function mainly makes sure that enums,
+   * counters, etc are handled properly.
+   */
+  @SuppressWarnings("unchecked")
+  private static <T> T deserializeObjectByJavaXML(InputStream in) {
+    XMLDecoder d = null;
+    try {
+      d = new XMLDecoder(in, null, null);
+      return (T) d.readObject();
+    } finally {
+      if (null != d) {
+        d.close();
+      }
+    }
+  }
+
+  private static <T> T deserializeObjectByKryo(Kryo kryo, InputStream in, Class<T> clazz ) {
+    Input inp = new Input(in);
+    kryo.setClassLoader(Utilities.getSessionSpecifiedClassLoader());
+    T t = kryo.readObject(inp,clazz);
+    inp.close();
+    return t;
+  }
+
+  public static List<Operator<?>> cloneOperatorTree(Configuration conf, List<Operator<?>> roots) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+    serializePlan(roots, baos, conf, true);
+    @SuppressWarnings("unchecked")
+    List<Operator<?>> result =
+        deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+            roots.getClass(), conf, true);
+    return result;
+  }
+
+  /**
+   * Serializes expression via Kryo.
+   * @param expr Expression.
+   * @return Bytes.
+   */
+  public static byte[] serializeExpressionToKryo(ExprNodeGenericFuncDesc expr) {
+    return serializeObjectToKryo(expr);
+  }
+
+  /**
+   * Deserializes expression from Kryo.
+   * @param bytes Bytes containing the expression.
+   * @return Expression; null if deserialization succeeded, but the result type is incorrect.
+   */
+  public static ExprNodeGenericFuncDesc deserializeExpressionFromKryo(byte[] bytes) {
+    return deserializeObjectFromKryo(bytes, ExprNodeGenericFuncDesc.class);
+  }
+
+  public static String serializeExpression(ExprNodeGenericFuncDesc expr) {
+    try {
+      return new String(Base64.encodeBase64(serializeExpressionToKryo(expr)), "UTF-8");
+    } catch (UnsupportedEncodingException ex) {
+      throw new RuntimeException("UTF-8 support required", ex);
+    }
+  }
+
+  public static ExprNodeGenericFuncDesc deserializeExpression(String s) {
+    byte[] bytes;
+    try {
+      bytes = Base64.decodeBase64(s.getBytes("UTF-8"));
+    } catch (UnsupportedEncodingException ex) {
+      throw new RuntimeException("UTF-8 support required", ex);
+    }
+    return deserializeExpressionFromKryo(bytes);
+  }
+
+  private static byte[] serializeObjectToKryo(Serializable object) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    Output output = new Output(baos);
+    Kryo kryo = borrowKryo();
+    try {
+      kryo.writeObject(output, object);
+    } finally {
+      releaseKryo(kryo);
+    }
+    output.close();
+    return baos.toByteArray();
+  }
+
+  private static <T extends Serializable> T deserializeObjectFromKryo(byte[] bytes, Class<T> clazz) {
+    Input inp = new Input(new ByteArrayInputStream(bytes));
+    Kryo kryo = borrowKryo();
+    T func = null;
+    try {
+      func = kryo.readObject(inp, clazz);
+    } finally {
+      releaseKryo(kryo);
+    }
+    inp.close();
+    return func;
+  }
+
+  public static String serializeObject(Serializable expr) {
+    try {
+      return new String(Base64.encodeBase64(serializeObjectToKryo(expr)), "UTF-8");
+    } catch (UnsupportedEncodingException ex) {
+      throw new RuntimeException("UTF-8 support required", ex);
+    }
+  }
+
+  public static <T extends Serializable> T deserializeObject(String s, Class<T> clazz) {
+    try {
+      return deserializeObjectFromKryo(Base64.decodeBase64(s.getBytes("UTF-8")), clazz);
+    } catch (UnsupportedEncodingException ex) {
+      throw new RuntimeException("UTF-8 support required", ex);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index dacb80f..c01994f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -20,12 +20,8 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.beans.DefaultPersistenceDelegate;
 import java.beans.Encoder;
-import java.beans.ExceptionListener;
 import java.beans.Expression;
-import java.beans.PersistenceDelegate;
 import java.beans.Statement;
-import java.beans.XMLDecoder;
-import java.beans.XMLEncoder;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
@@ -36,29 +32,22 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.net.URLDecoder;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLTransientException;
-import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Date;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -83,12 +72,10 @@ import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
 
-import org.antlr.runtime.CommonToken;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.WordUtils;
 import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -150,22 +137,16 @@ import org.apache.hadoop.hive.ql.metadata.InputEstimator;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
-import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.api.Adjacency;
@@ -181,9 +162,6 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantMapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -209,14 +187,10 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Shell;
 import org.apache.hive.common.util.ReflectionUtil;
-import org.objenesis.strategy.StdInstantiatorStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
 import com.google.common.base.Preconditions;
 
 /**
@@ -391,6 +365,7 @@ public final class Utilities {
   private static BaseWork getBaseWork(Configuration conf, String name) {
     Path path = null;
     InputStream in = null;
+    Kryo kryo = SerializationUtilities.borrowKryo();
     try {
       String engine = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE);
       if (engine.equals("spark")) {
@@ -401,7 +376,7 @@ public final class Utilities {
           ClassLoader loader = Thread.currentThread().getContextClassLoader();
           ClassLoader newLoader = addToClassPath(loader, addedJars.split(";"));
           Thread.currentThread().setContextClassLoader(newLoader);
-          runtimeSerializationKryo.get().setClassLoader(newLoader);
+          kryo.setClassLoader(newLoader);
         }
       }
 
@@ -410,16 +385,7 @@ public final class Utilities {
       assert path != null;
       BaseWork gWork = gWorkMap.get(conf).get(path);
       if (gWork == null) {
-        Path localPath;
-        if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) {
-          localPath = new Path(name);
-        } else if (ShimLoader.getHadoopShims().isLocalMode(conf)) {
-          localPath = path;
-        } else {
-          LOG.debug("***************non-local mode***************");
-          localPath = new Path(name);
-        }
-        localPath = path;
+        Path localPath = path;
         LOG.debug("local path = " + localPath);
         if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) {
           LOG.debug("Loading plan from string: "+path.toUri().getPath());
@@ -438,29 +404,29 @@ public final class Utilities {
 
         if(MAP_PLAN_NAME.equals(name)){
           if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){
-            gWork = deserializePlan(in, MapWork.class, conf);
+            gWork = SerializationUtilities.deserializePlan(kryo, in, MapWork.class, conf);
           } else if(MergeFileMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
-            gWork = deserializePlan(in, MergeFileWork.class, conf);
+            gWork = SerializationUtilities.deserializePlan(kryo, in, MergeFileWork.class, conf);
           } else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
-            gWork = deserializePlan(in, ColumnTruncateWork.class, conf);
+            gWork = SerializationUtilities.deserializePlan(kryo, in, ColumnTruncateWork.class, conf);
           } else if(PartialScanMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
-            gWork = deserializePlan(in, PartialScanWork.class,conf);
+            gWork = SerializationUtilities.deserializePlan(kryo, in, PartialScanWork.class,conf);
           } else {
             throw new RuntimeException("unable to determine work from configuration ."
                 + MAPRED_MAPPER_CLASS + " was "+ conf.get(MAPRED_MAPPER_CLASS)) ;
           }
         } else if (REDUCE_PLAN_NAME.equals(name)) {
           if(ExecReducer.class.getName().equals(conf.get(MAPRED_REDUCER_CLASS))) {
-            gWork = deserializePlan(in, ReduceWork.class, conf);
+            gWork = SerializationUtilities.deserializePlan(kryo, in, ReduceWork.class, conf);
           } else {
             throw new RuntimeException("unable to determine work from configuration ."
                 + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ;
           }
         } else if (name.contains(MERGE_PLAN_NAME)) {
           if (name.startsWith(MAPNAME)) {
-            gWork = deserializePlan(in, MapWork.class, conf);
+            gWork = SerializationUtilities.deserializePlan(kryo, in, MapWork.class, conf);
           } else if (name.startsWith(REDUCENAME)) {
-            gWork = deserializePlan(in, ReduceWork.class, conf);
+            gWork = SerializationUtilities.deserializePlan(kryo, in, ReduceWork.class, conf);
           } else {
             throw new RuntimeException("Unknown work type: " + name);
           }
@@ -480,6 +446,7 @@ public final class Utilities {
       LOG.error(msg, e);
       throw new RuntimeException(msg, e);
     } finally {
+      SerializationUtilities.releaseKryo(kryo);
       if (in != null) {
         try {
           in.close();
@@ -523,163 +490,6 @@ public final class Utilities {
     return ret;
   }
 
-  /**
-   * Java 1.5 workaround. From http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5015403
-   */
-  public static class EnumDelegate extends DefaultPersistenceDelegate {
-    @Override
-    protected Expression instantiate(Object oldInstance, Encoder out) {
-      return new Expression(Enum.class, "valueOf", new Object[] {oldInstance.getClass(),
-          ((Enum<?>) oldInstance).name()});
-    }
-
-    @Override
-    protected boolean mutatesTo(Object oldInstance, Object newInstance) {
-      return oldInstance == newInstance;
-    }
-  }
-
-  public static class MapDelegate extends DefaultPersistenceDelegate {
-    @Override
-    protected Expression instantiate(Object oldInstance, Encoder out) {
-      Map oldMap = (Map) oldInstance;
-      HashMap newMap = new HashMap(oldMap);
-      return new Expression(newMap, HashMap.class, "new", new Object[] {});
-    }
-
-    @Override
-    protected boolean mutatesTo(Object oldInstance, Object newInstance) {
-      return false;
-    }
-
-    @Override
-    protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
-      java.util.Collection oldO = (java.util.Collection) oldInstance;
-      java.util.Collection newO = (java.util.Collection) newInstance;
-
-      if (newO.size() != 0) {
-        out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
-      }
-      for (Iterator i = oldO.iterator(); i.hasNext();) {
-        out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
-      }
-    }
-  }
-
-  public static class SetDelegate extends DefaultPersistenceDelegate {
-    @Override
-    protected Expression instantiate(Object oldInstance, Encoder out) {
-      Set oldSet = (Set) oldInstance;
-      HashSet newSet = new HashSet(oldSet);
-      return new Expression(newSet, HashSet.class, "new", new Object[] {});
-    }
-
-    @Override
-    protected boolean mutatesTo(Object oldInstance, Object newInstance) {
-      return false;
-    }
-
-    @Override
-    protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
-      java.util.Collection oldO = (java.util.Collection) oldInstance;
-      java.util.Collection newO = (java.util.Collection) newInstance;
-
-      if (newO.size() != 0) {
-        out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
-      }
-      for (Iterator i = oldO.iterator(); i.hasNext();) {
-        out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
-      }
-    }
-
-  }
-
-  public static class ListDelegate extends DefaultPersistenceDelegate {
-    @Override
-    protected Expression instantiate(Object oldInstance, Encoder out) {
-      List oldList = (List) oldInstance;
-      ArrayList newList = new ArrayList(oldList);
-      return new Expression(newList, ArrayList.class, "new", new Object[] {});
-    }
-
-    @Override
-    protected boolean mutatesTo(Object oldInstance, Object newInstance) {
-      return false;
-    }
-
-    @Override
-    protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
-      java.util.Collection oldO = (java.util.Collection) oldInstance;
-      java.util.Collection newO = (java.util.Collection) newInstance;
-
-      if (newO.size() != 0) {
-        out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
-      }
-      for (Iterator i = oldO.iterator(); i.hasNext();) {
-        out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
-      }
-    }
-
-  }
-
-  /**
-   * DatePersistenceDelegate. Needed to serialize java.util.Date
-   * since it is not serialization friendly.
-   * Also works for java.sql.Date since it derives from java.util.Date.
-   */
-  public static class DatePersistenceDelegate extends PersistenceDelegate {
-
-    @Override
-    protected Expression instantiate(Object oldInstance, Encoder out) {
-      Date dateVal = (Date)oldInstance;
-      Object[] args = { dateVal.getTime() };
-      return new Expression(dateVal, dateVal.getClass(), "new", args);
-    }
-
-    @Override
-    protected boolean mutatesTo(Object oldInstance, Object newInstance) {
-      if (oldInstance == null || newInstance == null) {
-        return false;
-      }
-      return oldInstance.getClass() == newInstance.getClass();
-    }
-  }
-
-  /**
-   * TimestampPersistenceDelegate. Needed to serialize java.sql.Timestamp since
-   * it is not serialization friendly.
-   */
-  public static class TimestampPersistenceDelegate extends DatePersistenceDelegate {
-    @Override
-    protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
-      Timestamp ts = (Timestamp)oldInstance;
-      Object[] args = { ts.getNanos() };
-      Statement stmt = new Statement(oldInstance, "setNanos", args);
-      out.writeStatement(stmt);
-    }
-  }
-
-  /**
-   * Need to serialize org.antlr.runtime.CommonToken
-   */
-  public static class CommonTokenDelegate extends PersistenceDelegate {
-    @Override
-    protected Expression instantiate(Object oldInstance, Encoder out) {
-      CommonToken ct = (CommonToken)oldInstance;
-      Object[] args = {ct.getType(), ct.getText()};
-      return new Expression(ct, ct.getClass(), "new", args);
-    }
-  }
-
-  public static class PathDelegate extends PersistenceDelegate {
-    @Override
-    protected Expression instantiate(Object oldInstance, Encoder out) {
-      Path p = (Path)oldInstance;
-      Object[] args = {p.toString()};
-      return new Expression(p, p.getClass(), "new", args);
-    }
-  }
-
   public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) {
     String useName = conf.get(INPUT_NAME);
     if (useName == null) {
@@ -702,6 +512,7 @@ public final class Utilities {
   }
 
   private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, String name, boolean useCache) {
+    Kryo kryo = SerializationUtilities.borrowKryo();
     try {
       setPlanPath(conf, hiveScratchDir);
 
@@ -714,7 +525,7 @@ public final class Utilities {
         ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
         try {
           out = new DeflaterOutputStream(byteOut, new Deflater(Deflater.BEST_SPEED));
-          serializePlan(w, out, conf);
+          SerializationUtilities.serializePlan(kryo, w, out, conf);
           out.close();
           out = null;
         } finally {
@@ -728,7 +539,7 @@ public final class Utilities {
         FileSystem fs = planPath.getFileSystem(conf);
         try {
           out = fs.create(planPath);
-          serializePlan(w, out, conf);
+          SerializationUtilities.serializePlan(kryo, w, out, conf);
           out.close();
           out = null;
         } finally {
@@ -760,6 +571,8 @@ public final class Utilities {
       String msg = "Error caching " + name + ": " + e;
       LOG.error(msg, e);
       throw new RuntimeException(msg, e);
+    } finally {
+      SerializationUtilities.releaseKryo(kryo);
     }
   }
 
@@ -790,73 +603,6 @@ public final class Utilities {
     return null;
   }
 
-  /**
-   * Serializes expression via Kryo.
-   * @param expr Expression.
-   * @return Bytes.
-   */
-  public static byte[] serializeExpressionToKryo(ExprNodeGenericFuncDesc expr) {
-    return serializeObjectToKryo(expr);
-  }
-
-  /**
-   * Deserializes expression from Kryo.
-   * @param bytes Bytes containing the expression.
-   * @return Expression; null if deserialization succeeded, but the result type is incorrect.
-   */
-  public static ExprNodeGenericFuncDesc deserializeExpressionFromKryo(byte[] bytes) {
-    return deserializeObjectFromKryo(bytes, ExprNodeGenericFuncDesc.class);
-  }
-
-  public static String serializeExpression(ExprNodeGenericFuncDesc expr) {
-    try {
-      return new String(Base64.encodeBase64(serializeExpressionToKryo(expr)), "UTF-8");
-    } catch (UnsupportedEncodingException ex) {
-      throw new RuntimeException("UTF-8 support required", ex);
-    }
-  }
-
-  public static ExprNodeGenericFuncDesc deserializeExpression(String s) {
-    byte[] bytes;
-    try {
-      bytes = Base64.decodeBase64(s.getBytes("UTF-8"));
-    } catch (UnsupportedEncodingException ex) {
-      throw new RuntimeException("UTF-8 support required", ex);
-    }
-    return deserializeExpressionFromKryo(bytes);
-  }
-
-  private static byte[] serializeObjectToKryo(Serializable object) {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    Output output = new Output(baos);
-    runtimeSerializationKryo.get().writeObject(output, object);
-    output.close();
-    return baos.toByteArray();
-  }
-
-  private static <T extends Serializable> T deserializeObjectFromKryo(byte[] bytes, Class<T> clazz) {
-    Input inp = new Input(new ByteArrayInputStream(bytes));
-    T func = runtimeSerializationKryo.get().readObject(inp, clazz);
-    inp.close();
-    return func;
-  }
-
-  public static String serializeObject(Serializable expr) {
-    try {
-      return new String(Base64.encodeBase64(serializeObjectToKryo(expr)), "UTF-8");
-    } catch (UnsupportedEncodingException ex) {
-      throw new RuntimeException("UTF-8 support required", ex);
-    }
-  }
-
-  public static <T extends Serializable> T deserializeObject(String s, Class<T> clazz) {
-    try {
-      return deserializeObjectFromKryo(Base64.decodeBase64(s.getBytes("UTF-8")), clazz);
-    } catch (UnsupportedEncodingException ex) {
-      throw new RuntimeException("UTF-8 support required", ex);
-    }
-  }
-
   public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate {
     @Override
     protected Expression instantiate(Object oldInstance, Encoder out) {
@@ -872,415 +618,6 @@ public final class Utilities {
     }
   }
 
-  /**
-   * Kryo serializer for timestamp.
-   */
-  private static class TimestampSerializer extends
-  com.esotericsoftware.kryo.Serializer<Timestamp> {
-
-    @Override
-    public Timestamp read(Kryo kryo, Input input, Class<Timestamp> clazz) {
-      Timestamp ts = new Timestamp(input.readLong());
-      ts.setNanos(input.readInt());
-      return ts;
-    }
-
-    @Override
-    public void write(Kryo kryo, Output output, Timestamp ts) {
-      output.writeLong(ts.getTime());
-      output.writeInt(ts.getNanos());
-    }
-  }
-
-   /** Custom Kryo serializer for sql date, otherwise Kryo gets confused between
-   java.sql.Date and java.util.Date while deserializing
-   */
-  private static class SqlDateSerializer extends
-    com.esotericsoftware.kryo.Serializer<java.sql.Date> {
-
-    @Override
-    public java.sql.Date read(Kryo kryo, Input input, Class<java.sql.Date> clazz) {
-      return new java.sql.Date(input.readLong());
-    }
-
-    @Override
-    public void write(Kryo kryo, Output output, java.sql.Date sqlDate) {
-      output.writeLong(sqlDate.getTime());
-    }
-  }
-
-  private static class CommonTokenSerializer extends com.esotericsoftware.kryo.Serializer<CommonToken> {
-    @Override
-    public CommonToken read(Kryo kryo, Input input, Class<CommonToken> clazz) {
-      return new CommonToken(input.readInt(), input.readString());
-    }
-
-    @Override
-  public void write(Kryo kryo, Output output, CommonToken token) {
-      output.writeInt(token.getType());
-      output.writeString(token.getText());
-    }
-  }
-
-  private static class PathSerializer extends com.esotericsoftware.kryo.Serializer<Path> {
-
-    @Override
-    public void write(Kryo kryo, Output output, Path path) {
-      output.writeString(path.toUri().toString());
-    }
-
-    @Override
-    public Path read(Kryo kryo, Input input, Class<Path> type) {
-      return new Path(URI.create(input.readString()));
-    }
-  }
-
-  public static List<Operator<?>> cloneOperatorTree(Configuration conf, List<Operator<?>> roots) {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-    serializePlan(roots, baos, conf, true);
-    @SuppressWarnings("unchecked")
-    List<Operator<?>> result =
-        deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
-        roots.getClass(), conf, true);
-    return result;
-  }
-
-  private static void serializePlan(Object plan, OutputStream out, Configuration conf, boolean cloningPlan) {
-    PerfLogger perfLogger = SessionState.getPerfLogger();
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
-    String serializationType = conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo");
-    LOG.info("Serializing " + plan.getClass().getSimpleName() + " via " + serializationType);
-    if("javaXML".equalsIgnoreCase(serializationType)) {
-      serializeObjectByJavaXML(plan, out);
-    } else {
-      if(cloningPlan) {
-        serializeObjectByKryo(cloningQueryPlanKryo.get(), plan, out);
-      } else {
-        serializeObjectByKryo(runtimeSerializationKryo.get(), plan, out);
-      }
-    }
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
-  }
-  /**
-   * Serializes the plan.
-   * @param plan The plan, such as QueryPlan, MapredWork, etc.
-   * @param out The stream to write to.
-   * @param conf to pick which serialization format is desired.
-   */
-  public static void serializePlan(Object plan, OutputStream out, Configuration conf) {
-    serializePlan(plan, out, conf, false);
-  }
-
-  private static <T> T deserializePlan(InputStream in, Class<T> planClass, Configuration conf, boolean cloningPlan) {
-    PerfLogger perfLogger = SessionState.getPerfLogger();
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);
-    T plan;
-    String serializationType = conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo");
-    LOG.info("Deserializing " + planClass.getSimpleName() + " via " + serializationType);
-    if("javaXML".equalsIgnoreCase(serializationType)) {
-      plan = deserializeObjectByJavaXML(in);
-    } else {
-      if(cloningPlan) {
-        plan = deserializeObjectByKryo(cloningQueryPlanKryo.get(), in, planClass);
-      } else {
-        plan = deserializeObjectByKryo(runtimeSerializationKryo.get(), in, planClass);
-      }
-    }
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);
-    return plan;
-  }
-  /**
-   * Deserializes the plan.
-   * @param in The stream to read from.
-   * @param planClass class of plan
-   * @param conf configuration
-   * @return The plan, such as QueryPlan, MapredWork, etc.
-   */
-  public static <T> T deserializePlan(InputStream in, Class<T> planClass, Configuration conf) {
-    return deserializePlan(in, planClass, conf, false);
-  }
-
-  /**
-   * Clones using the powers of XML. Do not use unless necessary.
-   * @param plan The plan.
-   * @return The clone.
-   */
-  public static MapredWork clonePlan(MapredWork plan) {
-    // TODO: need proper clone. Meanwhile, let's at least keep this horror in one place
-    PerfLogger perfLogger = SessionState.getPerfLogger();
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN);
-    ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-    Configuration conf = new HiveConf();
-    serializePlan(plan, baos, conf, true);
-    MapredWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
-        MapredWork.class, conf, true);
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN);
-    return newPlan;
-  }
-
-  /**
-   * Clones using the powers of XML. Do not use unless necessary.
-   * @param plan The plan.
-   * @return The clone.
-   */
-  public static BaseWork cloneBaseWork(BaseWork plan) {
-    PerfLogger perfLogger = SessionState.getPerfLogger();
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN);
-    ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-    Configuration conf = new HiveConf();
-    serializePlan(plan, baos, conf, true);
-    BaseWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
-        plan.getClass(), conf, true);
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN);
-    return newPlan;
-  }
-
-  /**
-   * Serialize the object. This helper function mainly makes sure that enums,
-   * counters, etc are handled properly.
-   */
-  private static void serializeObjectByJavaXML(Object plan, OutputStream out) {
-    XMLEncoder e = new XMLEncoder(out);
-    e.setExceptionListener(new ExceptionListener() {
-      @Override
-      public void exceptionThrown(Exception e) {
-        LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e));
-        throw new RuntimeException("Cannot serialize object", e);
-      }
-    });
-    // workaround for java 1.5
-    e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
-    e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
-    e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
-    e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
-
-    e.setPersistenceDelegate(org.datanucleus.store.types.backed.Map.class, new MapDelegate());
-    e.setPersistenceDelegate(org.datanucleus.store.types.backed.List.class, new ListDelegate());
-    e.setPersistenceDelegate(CommonToken.class, new CommonTokenDelegate());
-    e.setPersistenceDelegate(Path.class, new PathDelegate());
-
-    e.writeObject(plan);
-    e.close();
-  }
-
-  /**
-   * @param plan Usually of type MapredWork, MapredLocalWork etc.
-   * @param out stream in which serialized plan is written into
-   */
-  private static void serializeObjectByKryo(Kryo kryo, Object plan, OutputStream out) {
-    Output output = new Output(out);
-    kryo.setClassLoader(getSessionSpecifiedClassLoader());
-    kryo.writeObject(output, plan);
-    output.close();
-  }
-
-  /**
-   * De-serialize an object. This helper function mainly makes sure that enums,
-   * counters, etc are handled properly.
-   */
-  @SuppressWarnings("unchecked")
-  private static <T> T deserializeObjectByJavaXML(InputStream in) {
-    XMLDecoder d = null;
-    try {
-      d = new XMLDecoder(in, null, null);
-      return (T) d.readObject();
-    } finally {
-      if (null != d) {
-        d.close();
-      }
-    }
-  }
-
-  private static <T> T deserializeObjectByKryo(Kryo kryo, InputStream in, Class<T> clazz ) {
-    Input inp = new Input(in);
-    kryo.setClassLoader(getSessionSpecifiedClassLoader());
-    T t = kryo.readObject(inp,clazz);
-    inp.close();
-    return t;
-  }
-
-  // Kryo is not thread-safe,
-  // Also new Kryo() is expensive, so we want to do it just once.
-  public static ThreadLocal<Kryo>
-      runtimeSerializationKryo = new ThreadLocal<Kryo>() {
-    @Override
-    protected Kryo initialValue() {
-      Kryo kryo = new Kryo();
-      kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
-      kryo.register(java.sql.Date.class, new SqlDateSerializer());
-      kryo.register(java.sql.Timestamp.class, new TimestampSerializer());
-      kryo.register(Path.class, new PathSerializer());
-      kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() );
-      ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(
-          new StdInstantiatorStrategy());
-      removeField(kryo, Operator.class, "colExprMap");
-      removeField(kryo, AbstractOperatorDesc.class, "statistics");
-      kryo.register(MapWork.class);
-      kryo.register(ReduceWork.class);
-      kryo.register(TableDesc.class);
-      kryo.register(UnionOperator.class);
-      kryo.register(FileSinkOperator.class);
-      kryo.register(HiveIgnoreKeyTextOutputFormat.class);
-      kryo.register(StandardConstantListObjectInspector.class);
-      kryo.register(StandardConstantMapObjectInspector.class);
-      kryo.register(StandardConstantStructObjectInspector.class);
-      kryo.register(SequenceFileInputFormat.class);
-      kryo.register(HiveSequenceFileOutputFormat.class);
-      return kryo;
-    };
-  };
-  @SuppressWarnings("rawtypes")
-  protected static void removeField(Kryo kryo, Class type, String fieldName) {
-    FieldSerializer fld = new FieldSerializer(kryo, type);
-    fld.removeField(fieldName);
-    kryo.register(type, fld);
-  }
-
-  public static ThreadLocal<Kryo> sparkSerializationKryo = new ThreadLocal<Kryo>() {
-    @Override
-    protected synchronized Kryo initialValue() {
-      Kryo kryo = new Kryo();
-      kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
-      kryo.register(java.sql.Date.class, new SqlDateSerializer());
-      kryo.register(java.sql.Timestamp.class, new TimestampSerializer());
-      kryo.register(Path.class, new PathSerializer());
-      kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() );
-      ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
-      removeField(kryo, Operator.class, "colExprMap");
-      removeField(kryo, ColumnInfo.class, "objectInspector");
-      removeField(kryo, AbstractOperatorDesc.class, "statistics");
-      kryo.register(SparkEdgeProperty.class);
-      kryo.register(MapWork.class);
-      kryo.register(ReduceWork.class);
-      kryo.register(SparkWork.class);
-      kryo.register(TableDesc.class);
-      kryo.register(Pair.class);
-      kryo.register(UnionOperator.class);
-      kryo.register(FileSinkOperator.class);
-      kryo.register(HiveIgnoreKeyTextOutputFormat.class);
-      kryo.register(StandardConstantListObjectInspector.class);
-      kryo.register(StandardConstantMapObjectInspector.class);
-      kryo.register(StandardConstantStructObjectInspector.class);
-      kryo.register(SequenceFileInputFormat.class);
-      kryo.register(HiveSequenceFileOutputFormat.class);
-      return kryo;
-    };
-  };
-
-  private static ThreadLocal<Kryo> cloningQueryPlanKryo = new ThreadLocal<Kryo>() {
-    @Override
-    protected Kryo initialValue() {
-      Kryo kryo = new Kryo();
-      kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
-      kryo.register(CommonToken.class, new CommonTokenSerializer());
-      kryo.register(java.sql.Date.class, new SqlDateSerializer());
-      kryo.register(java.sql.Timestamp.class, new TimestampSerializer());
-      kryo.register(Path.class, new PathSerializer());
-      kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() );
-      ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(
-          new StdInstantiatorStrategy());
-      removeField(kryo, Operator.class, "colExprMap");
-      removeField(kryo, AbstractOperatorDesc.class, "statistics");
-      kryo.register(MapWork.class);
-      kryo.register(ReduceWork.class);
-      kryo.register(TableDesc.class);
-      kryo.register(UnionOperator.class);
-      kryo.register(FileSinkOperator.class);
-      kryo.register(HiveIgnoreKeyTextOutputFormat.class);
-      kryo.register(StandardConstantListObjectInspector.class);
-      kryo.register(StandardConstantMapObjectInspector.class);
-      kryo.register(StandardConstantStructObjectInspector.class);
-      kryo.register(SequenceFileInputFormat.class);
-      kryo.register(HiveSequenceFileOutputFormat.class);
-      return kryo;
-    };
-  };
-
-  /**
-   * A kryo {@link Serializer} for lists created via {@link Arrays#asList(Object...)}.
-   * <p>
-   * Note: This serializer does not support cyclic references, so if one of the objects
-   * gets set the list as attribute this might cause an error during deserialization.
-   * </p>
-   *
-   * This is from kryo-serializers package. Added explicitly to avoid classpath issues.
-   */
-  private static class ArraysAsListSerializer extends com.esotericsoftware.kryo.Serializer<List<?>> {
-
-    private Field _arrayField;
-
-    public ArraysAsListSerializer() {
-      try {
-        _arrayField = Class.forName( "java.util.Arrays$ArrayList" ).getDeclaredField( "a" );
-        _arrayField.setAccessible( true );
-      } catch ( final Exception e ) {
-        throw new RuntimeException( e );
-      }
-      // Immutable causes #copy(obj) to return the original object
-      setImmutable(true);
-    }
-
-    @Override
-    public List<?> read(final Kryo kryo, final Input input, final Class<List<?>> type) {
-      final int length = input.readInt(true);
-      Class<?> componentType = kryo.readClass( input ).getType();
-      if (componentType.isPrimitive()) {
-        componentType = getPrimitiveWrapperClass(componentType);
-      }
-      try {
-        final Object items = Array.newInstance( componentType, length );
-        for( int i = 0; i < length; i++ ) {
-          Array.set(items, i, kryo.readClassAndObject( input ));
-        }
-        return Arrays.asList( (Object[])items );
-      } catch ( final Exception e ) {
-        throw new RuntimeException( e );
-      }
-    }
-
-    @Override
-    public void write(final Kryo kryo, final Output output, final List<?> obj) {
-      try {
-        final Object[] array = (Object[]) _arrayField.get( obj );
-        output.writeInt(array.length, true);
-        final Class<?> componentType = array.getClass().getComponentType();
-        kryo.writeClass( output, componentType );
-        for( final Object item : array ) {
-          kryo.writeClassAndObject( output, item );
-        }
-      } catch ( final RuntimeException e ) {
-        // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write...
-        // handles SerializationException specifically (resizing the buffer)...
-        throw e;
-      } catch ( final Exception e ) {
-        throw new RuntimeException( e );
-      }
-    }
-
-    private Class<?> getPrimitiveWrapperClass(final Class<?> c) {
-      if (c.isPrimitive()) {
-        if (c.equals(Long.TYPE)) {
-          return Long.class;
-        } else if (c.equals(Integer.TYPE)) {
-          return Integer.class;
-        } else if (c.equals(Double.TYPE)) {
-          return Double.class;
-        } else if (c.equals(Float.TYPE)) {
-          return Float.class;
-        } else if (c.equals(Boolean.TYPE)) {
-          return Boolean.class;
-        } else if (c.equals(Character.TYPE)) {
-          return Character.class;
-        } else if (c.equals(Short.TYPE)) {
-          return Short.class;
-        } else if (c.equals(Byte.TYPE)) {
-          return Byte.class;
-        }
-      }
-      return c;
-    }
-  }
-
   public static TableDesc defaultTd;
   static {
     // by default we expect ^A separated strings

http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index df96d8c..2129bda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -740,12 +741,13 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
     int ret;
     if (localtask) {
       memoryMXBean = ManagementFactory.getMemoryMXBean();
-      MapredLocalWork plan = Utilities.deserializePlan(pathData, MapredLocalWork.class, conf);
+      MapredLocalWork plan = SerializationUtilities.deserializePlan(pathData, MapredLocalWork.class,
+          conf);
       MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent);
       ret = ed.executeInProcess(new DriverContext());
 
     } else {
-      MapredWork plan = Utilities.deserializePlan(pathData, MapredWork.class, conf);
+      MapredWork plan = SerializationUtilities.deserializePlan(pathData, MapredWork.class, conf);
       ExecDriver ed = new ExecDriver(plan, conf, isSilent);
       ret = ed.execute(new DriverContext());
     }