You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/12/08 20:08:11 UTC
[1/2] hive git commit: HIVE-12302: Use KryoPool instead of
thread-local caching (Prasanth Jayachandran reviewed by Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 560e4feba -> 2bb5e63c9
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
index 058d63d..cb70ac8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
@@ -177,7 +178,7 @@ public class MapRedTask extends ExecDriver implements Serializable {
OutputStream out = null;
try {
out = FileSystem.getLocal(conf).create(planPath);
- Utilities.serializePlan(plan, out, conf);
+ SerializationUtilities.serializePlan(plan, out, conf);
out.close();
out = null;
} finally {
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index bfe21db..cb7dfa1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -33,6 +33,7 @@ import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -159,7 +160,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab
OutputStream out = null;
try {
out = FileSystem.getLocal(conf).create(planPath);
- Utilities.serializePlan(plan, out, conf);
+ SerializationUtilities.serializePlan(plan, out, conf);
out.close();
out = null;
} finally {
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index a0c9b98..f2f3c09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -29,15 +29,12 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hive.common.util.HashCodeUtil;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
import org.apache.hadoop.hive.ql.exec.JoinUtil;
import org.apache.hadoop.hive.ql.exec.JoinUtil.JoinResult;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper;
import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper;
import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
@@ -58,6 +55,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hive.common.util.BloomFilter;
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.esotericsoftware.kryo.Kryo;
@@ -159,8 +159,13 @@ public class HybridHashTableContainer
} else {
InputStream inputStream = Files.newInputStream(hashMapLocalPath);
com.esotericsoftware.kryo.io.Input input = new com.esotericsoftware.kryo.io.Input(inputStream);
- Kryo kryo = Utilities.runtimeSerializationKryo.get();
- BytesBytesMultiHashMap restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class);
+ Kryo kryo = SerializationUtilities.borrowKryo();
+ BytesBytesMultiHashMap restoredHashMap = null;
+ try {
+ restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class);
+ } finally {
+ SerializationUtilities.releaseKryo(kryo);
+ }
if (rowCount > 0) {
restoredHashMap.expandAndRehashToTarget(rowCount);
@@ -551,10 +556,14 @@ public class HybridHashTableContainer
com.esotericsoftware.kryo.io.Output output =
new com.esotericsoftware.kryo.io.Output(outputStream);
- Kryo kryo = Utilities.runtimeSerializationKryo.get();
- kryo.writeObject(output, partition.hashMap); // use Kryo to serialize hashmap
- output.close();
- outputStream.close();
+ Kryo kryo = SerializationUtilities.borrowKryo();
+ try {
+ kryo.writeObject(output, partition.hashMap); // use Kryo to serialize hashmap
+ output.close();
+ outputStream.close();
+ } finally {
+ SerializationUtilities.releaseKryo(kryo);
+ }
partition.hashMapLocalPath = path;
partition.hashMapOnDisk = true;
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java
index 6d391a3..a976de0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java
@@ -17,21 +17,22 @@
*/
package org.apache.hadoop.hive.ql.exec.persistence;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
/**
* An eager object container that puts every row directly to output stream.
@@ -58,14 +59,11 @@ public class ObjectContainer<ROW> {
private Input input;
private Output output;
- private Kryo kryo;
-
public ObjectContainer() {
readBuffer = (ROW[]) new Object[IN_MEMORY_NUM_ROWS];
for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) {
readBuffer[i] = (ROW) new Object();
}
- kryo = Utilities.runtimeSerializationKryo.get();
try {
setupOutput();
} catch (IOException | HiveException e) {
@@ -101,7 +99,12 @@ public class ObjectContainer<ROW> {
}
public void add(ROW row) {
- kryo.writeClassAndObject(output, row);
+ Kryo kryo = SerializationUtilities.borrowKryo();
+ try {
+ kryo.writeClassAndObject(output, row);
+ } finally {
+ SerializationUtilities.releaseKryo(kryo);
+ }
rowsOnDisk++;
}
@@ -164,8 +167,13 @@ public class ObjectContainer<ROW> {
rowsInReadBuffer = rowsOnDisk;
}
- for (int i = 0; i < rowsInReadBuffer; i++) {
- readBuffer[i] = (ROW) kryo.readClassAndObject(input);
+ Kryo kryo = SerializationUtilities.borrowKryo();
+ try {
+ for (int i = 0; i < rowsInReadBuffer; i++) {
+ readBuffer[i] = (ROW) kryo.readClassAndObject(input);
+ }
+ } finally {
+ SerializationUtilities.releaseKryo(kryo);
}
if (input.eof()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
index fd7109a..d7c278a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
@@ -24,11 +24,12 @@ import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.mapred.JobConf;
+import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
@@ -39,15 +40,28 @@ public class KryoSerializer {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
Output output = new Output(stream);
- Utilities.sparkSerializationKryo.get().writeObject(output, object);
+ Kryo kryo = SerializationUtilities.borrowKryo();
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ try {
+ kryo.writeObject(output, object);
+ } finally {
+ SerializationUtilities.releaseKryo(kryo);
+ }
output.close(); // close() also calls flush()
return stream.toByteArray();
}
public static <T> T deserialize(byte[] buffer, Class<T> clazz) {
- return Utilities.sparkSerializationKryo.get().readObject(
- new Input(new ByteArrayInputStream(buffer)), clazz);
+ Kryo kryo = SerializationUtilities.borrowKryo();
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ T result = null;
+ try {
+ result = kryo.readObject(new Input(new ByteArrayInputStream(buffer)), clazz);
+ } finally {
+ SerializationUtilities.releaseKryo(kryo);
+ }
+ return result;
}
public static byte[] serializeJobConf(JobConf jobConf) {
@@ -80,8 +94,4 @@ public class KryoSerializer {
return conf;
}
- public static void setClassLoader(ClassLoader classLoader) {
- Utilities.sparkSerializationKryo.get().setClassLoader(classLoader);
- }
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index c4cb2ba..6380774 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -298,7 +298,6 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
Map<String, Long> addedJars = jc.getAddedJars();
if (addedJars != null && !addedJars.isEmpty()) {
SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir());
- KryoSerializer.setClassLoader(Thread.currentThread().getContextClassLoader());
localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";"));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 3feab1a..b19c70a 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map.Entry;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configurable;
@@ -532,14 +533,14 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
if (!hasObj) {
Serializable filterObject = scanDesc.getFilterObject();
if (filterObject != null) {
- serializedFilterObj = Utilities.serializeObject(filterObject);
+ serializedFilterObj = SerializationUtilities.serializeObject(filterObject);
}
}
if (serializedFilterObj != null) {
jobConf.set(TableScanDesc.FILTER_OBJECT_CONF_STR, serializedFilterObj);
}
if (!hasExpr) {
- serializedFilterExpr = Utilities.serializeExpression(filterExpr);
+ serializedFilterExpr = SerializationUtilities.serializeExpression(filterExpr);
}
String filterText = filterExpr.getExprString();
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
index 13390de..017676b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
@@ -128,7 +129,7 @@ public class ProjectionPusher {
}
final String filterText = filterExpr.getExprString();
- final String filterExprSerialized = Utilities.serializeExpression(filterExpr);
+ final String filterExprSerialized = SerializationUtilities.serializeExpression(filterExpr);
jobConf.set(
TableScanDesc.FILTER_TEXT_CONF_STR,
filterText);
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java b/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java
index 7e888bc..6d3a134 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java
@@ -23,11 +23,9 @@ import java.sql.Timestamp;
import java.util.List;
import org.apache.commons.codec.binary.Base64;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.HiveChar;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -51,6 +49,8 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
@@ -433,7 +433,7 @@ public class ConvertAstToSearchArg {
public static SearchArgument createFromConf(Configuration conf) {
String sargString;
if ((sargString = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR)) != null) {
- return create(Utilities.deserializeExpression(sargString));
+ return create(SerializationUtilities.deserializeExpression(sargString));
} else if ((sargString = conf.get(SARG_PUSHDOWN)) != null) {
return create(sargString);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 44189ef..c682df2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -18,10 +18,32 @@
package org.apache.hadoop.hive.ql.metadata;
-import com.google.common.collect.Sets;
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
+import static org.apache.hadoop.hive.serde.serdeConstants.COLLECTION_DELIM;
+import static org.apache.hadoop.hive.serde.serdeConstants.ESCAPE_CHAR;
+import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM;
+import static org.apache.hadoop.hive.serde.serdeConstants.LINE_DELIM;
+import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM;
+import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
+import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -81,6 +103,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.FunctionTask;
import org.apache.hadoop.hive.ql.exec.FunctionUtils;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.tez.InPlaceUpdates;
import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
@@ -100,32 +123,10 @@ import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
-import static org.apache.hadoop.hive.serde.serdeConstants.COLLECTION_DELIM;
-import static org.apache.hadoop.hive.serde.serdeConstants.ESCAPE_CHAR;
-import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM;
-import static org.apache.hadoop.hive.serde.serdeConstants.LINE_DELIM;
-import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM;
-import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
-import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
+import com.google.common.collect.Sets;
/**
* This class has functions that implement meta data/DDL operations using calls
@@ -2087,7 +2088,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
new ArrayList<ObjectPair<Integer,byte[]>>(partSpecs.size());
for (DropTableDesc.PartSpec partSpec : partSpecs) {
partExprs.add(new ObjectPair<Integer, byte[]>(partSpec.getPrefixLength(),
- Utilities.serializeExpressionToKryo(partSpec.getPartSpec())));
+ SerializationUtilities.serializeExpressionToKryo(partSpec.getPartSpec())));
}
List<org.apache.hadoop.hive.metastore.api.Partition> tParts = getMSC().dropPartitions(
dbName, tblName, partExprs, dropOptions);
@@ -2362,7 +2363,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
public boolean getPartitionsByExpr(Table tbl, ExprNodeGenericFuncDesc expr, HiveConf conf,
List<Partition> result) throws HiveException, TException {
assert result != null;
- byte[] exprBytes = Utilities.serializeExpressionToKryo(expr);
+ byte[] exprBytes = SerializationUtilities.serializeExpressionToKryo(expr);
String defaultPartitionName = HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME);
List<org.apache.hadoop.hive.metastore.api.Partition> msParts =
new ArrayList<org.apache.hadoop.hive.metastore.api.Partition>();
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
index 1f6b5d7..e9ca5fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -485,7 +486,7 @@ public class CommonJoinTaskDispatcher extends AbstractJoinTaskDispatcher impleme
}
// deep copy a new mapred work from xml
// Once HIVE-4396 is in, it would be faster to use a cheaper method to clone the plan
- MapredWork newWork = Utilities.clonePlan(currTask.getWork());
+ MapredWork newWork = SerializationUtilities.clonePlan(currTask.getWork());
// create map join task and set big table as i
MapRedTask newTask = convertTaskToMapJoinTask(newWork, pos);
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
index 15f0d70..a71c474 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
@@ -18,6 +18,13 @@
package org.apache.hadoop.hive.ql.optimizer.physical;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
@@ -27,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -53,13 +61,6 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* GenMRSkewJoinProcessor.
*
@@ -253,7 +254,7 @@ public final class GenMRSkewJoinProcessor {
HiveConf.ConfVars.HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS);
newPlan.setMapperCannotSpanPartns(mapperCannotSpanPartns);
- MapredWork clonePlan = Utilities.clonePlan(currPlan);
+ MapredWork clonePlan = SerializationUtilities.clonePlan(currPlan);
Operator<? extends OperatorDesc>[] parentOps = new TableScanOperator[tags.length];
for (int k = 0; k < tags.length; k++) {
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
index 895e64e..41d3522 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
@@ -18,10 +18,12 @@
package org.apache.hadoop.hive.ql.optimizer.physical;
-import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
@@ -32,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -62,12 +65,10 @@ import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import com.google.common.base.Preconditions;
/**
* Copied from GenMRSkewJoinProcessor. It's used for spark task
@@ -254,7 +255,7 @@ public class GenSparkSkewJoinProcessor {
// this makes sure MJ has the same downstream operator plan as the original join
List<Operator<?>> reducerList = new ArrayList<Operator<?>>();
reducerList.add(reduceWork.getReducer());
- Operator<? extends OperatorDesc> reducer = Utilities.cloneOperatorTree(
+ Operator<? extends OperatorDesc> reducer = SerializationUtilities.cloneOperatorTree(
parseCtx.getConf(), reducerList).get(0);
Preconditions.checkArgument(reducer instanceof JoinOperator,
"Reducer should be join operator, but actually is " + reducer.getName());
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
index e94f6e7..dc433fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
@@ -19,29 +19,18 @@ package org.apache.hadoop.hive.ql.optimizer.physical;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
-import java.util.List;
+import java.util.Map;
import java.util.Set;
-import java.util.SortedSet;
import java.util.Stack;
-import java.util.TreeSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.StatsTask;
-import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -52,13 +41,14 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* SerializeFilter is a simple physical optimizer that serializes all filter expressions in
@@ -151,7 +141,7 @@ public class SerializeFilter implements PhysicalPlanResolver {
LOG.debug("Serializing: " + ts.getConf().getFilterExpr().getExprString());
}
ts.getConf().setSerializedFilterExpr(
- Utilities.serializeExpression(ts.getConf().getFilterExpr()));
+ SerializationUtilities.serializeExpression(ts.getConf().getFilterExpr()));
}
if (ts.getConf() != null && ts.getConf().getFilterObject() != null) {
@@ -160,7 +150,7 @@ public class SerializeFilter implements PhysicalPlanResolver {
}
ts.getConf().setSerializedFilterObject(
- Utilities.serializeObject(ts.getConf().getFilterObject()));
+ SerializationUtilities.serializeObject(ts.getConf().getFilterObject()));
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
index 3b09c2f..658717c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -147,7 +148,7 @@ public class SortMergeJoinTaskDispatcher extends AbstractJoinTaskDispatcher impl
throws SemanticException {
try {
// deep copy a new mapred work
- MapredWork currJoinWork = Utilities.clonePlan(currWork);
+ MapredWork currJoinWork = SerializationUtilities.clonePlan(currWork);
SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork);
// change the newly created map-red plan as if it was a join operator
@@ -165,7 +166,7 @@ public class SortMergeJoinTaskDispatcher extends AbstractJoinTaskDispatcher impl
SMBMapJoinOperator smbJoinOp)
throws UnsupportedEncodingException, SemanticException {
// deep copy a new mapred work
- MapredWork newWork = Utilities.clonePlan(origWork);
+ MapredWork newWork = SerializationUtilities.clonePlan(origWork);
// create a mapred task for this work
MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext
.getParseContext().getConf());
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java
index f9978b4..42ad04b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java
@@ -22,13 +22,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.metastore.Metastore.SplitInfo;
import org.apache.hadoop.hive.metastore.Metastore.SplitInfos;
import org.apache.hadoop.hive.metastore.PartitionExpressionProxy;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcProto;
import org.apache.hadoop.hive.ql.io.orc.ReaderImpl;
@@ -38,6 +36,8 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The basic implementation of PartitionExpressionProxy that uses ql package classes.
@@ -71,7 +71,7 @@ public class PartitionExpressionForMetastore implements PartitionExpressionProxy
private ExprNodeGenericFuncDesc deserializeExpr(byte[] exprBytes) throws MetaException {
ExprNodeGenericFuncDesc expr = null;
try {
- expr = Utilities.deserializeExpressionFromKryo(exprBytes);
+ expr = SerializationUtilities.deserializeExpressionFromKryo(exprBytes);
} catch (Exception ex) {
LOG.error("Failed to deserialize the expression", ex);
throw new MetaException(ex.getMessage());
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java
index fb20080..6931ad9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java
@@ -30,8 +30,8 @@ import java.util.Set;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalPlanResolver;
@@ -95,7 +95,7 @@ public class SplitSparkWorkResolver implements PhysicalPlanResolver {
boolean isFirst = true;
for (BaseWork childWork : childWorks) {
- BaseWork clonedParentWork = Utilities.cloneBaseWork(parentWork);
+ BaseWork clonedParentWork = SerializationUtilities.cloneBaseWork(parentWork);
// give the cloned work a different name
clonedParentWork.setName(clonedParentWork.getName().replaceAll("^([a-zA-Z]+)(\\s+)(\\d+)",
"$1$2" + GenSparkUtils.getUtils().getNextSeqNumber()));
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 27d7276..fe0e234 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.parse;
+import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL;
+
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashSet;
@@ -26,8 +28,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -55,12 +56,12 @@ import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
-import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL;
-
/**
* GenTezUtils is a collection of shared helper methods to produce TezWork.
* All the methods in this class should be static, but some aren't; this is to facilitate testing.
@@ -216,7 +217,7 @@ public class GenTezUtils {
roots.addAll(context.eventOperatorSet);
// need to clone the plan.
- List<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
+ List<Operator<?>> newRoots = SerializationUtilities.cloneOperatorTree(conf, roots);
// we're cloning the operator plan but we're retaining the original work. That means
// that root operators have to be replaced with the cloned ops. The replacement map
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 40c23a5..8dc48cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -28,8 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -43,9 +41,9 @@ import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
@@ -60,10 +58,12 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
/**
* GenSparkUtils is a collection of shared helper methods to produce SparkWork
@@ -207,7 +207,7 @@ public class GenSparkUtils {
}
// need to clone the plan.
- List<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
+ List<Operator<?>> newRoots = SerializationUtilities.cloneOperatorTree(conf, roots);
// Build a map to map the original FileSinkOperator and the cloned FileSinkOperators
// This map is used for set the stats flag for the cloned FileSinkOperators in later process
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
index c140f67..4bb661a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
@@ -24,10 +24,10 @@ import java.util.List;
import java.util.Set;
import java.util.Stack;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
@@ -36,6 +36,8 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import com.google.common.base.Preconditions;
+
/**
* This processor triggers on SparkPartitionPruningSinkOperator. For a operator tree like
* this:
@@ -105,7 +107,8 @@ public class SplitOpTreeForDPP implements NodeProcessor {
filterOp.setChildOperators(Utilities.makeList(selOp));
// Now clone the tree above selOp
- List<Operator<?>> newRoots = Utilities.cloneOperatorTree(context.parseContext.getConf(), roots);
+ List<Operator<?>> newRoots = SerializationUtilities.cloneOperatorTree(
+ context.parseContext.getConf(), roots);
for (int i = 0; i < roots.size(); i++) {
TableScanOperator newTs = (TableScanOperator) newRoots.get(i);
TableScanOperator oldTs = (TableScanOperator) roots.get(i);
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
index 5c5fafa..e2aaa70 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
@@ -24,17 +24,18 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.stats.StatsAggregator;
import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
public class FSStatsAggregator implements StatsAggregator {
@@ -62,7 +63,12 @@ public class FSStatsAggregator implements StatsAggregator {
});
for (FileStatus file : status) {
Input in = new Input(fs.open(file.getPath()));
- statsMap = Utilities.runtimeSerializationKryo.get().readObject(in, statsMap.getClass());
+ Kryo kryo = SerializationUtilities.borrowKryo();
+ try {
+ statsMap = kryo.readObject(in, statsMap.getClass());
+ } finally {
+ SerializationUtilities.releaseKryo(kryo);
+ }
LOG.info("Read stats : " +statsMap);
statsList.add(statsMap);
in.close();
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java
index 80f954b..e5d89e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java
@@ -24,15 +24,16 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Output;
public class FSStatsPublisher implements StatsPublisher {
@@ -100,7 +101,12 @@ public class FSStatsPublisher implements StatsPublisher {
Output output = new Output(statsFile.getFileSystem(conf).create(statsFile,true));
LOG.debug("Created file : " + statsFile);
LOG.debug("Writing stats in it : " + statsMap);
- Utilities.runtimeSerializationKryo.get().writeObject(output, statsMap);
+ Kryo kryo = SerializationUtilities.borrowKryo();
+ try {
+ kryo.writeObject(output, statsMap);
+ } finally {
+ SerializationUtilities.releaseKryo(kryo);
+ }
output.close();
return true;
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/test/org/apache/hadoop/hive/metastore/TestMetastoreExpr.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/TestMetastoreExpr.java b/ql/src/test/org/apache/hadoop/hive/metastore/TestMetastoreExpr.java
index d6d513d..5e53604 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/TestMetastoreExpr.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/TestMetastoreExpr.java
@@ -23,8 +23,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Stack;
-import junit.framework.TestCase;
-
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -36,7 +34,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -52,6 +50,8 @@ import org.apache.thrift.TException;
import com.google.common.collect.Lists;
+import junit.framework.TestCase;
+
/**
* Tests hive metastore expression support. This should be moved in metastore module
* as soon as we are able to use ql from metastore server (requires splitting metastore
@@ -166,8 +166,8 @@ public class TestMetastoreExpr extends TestCase {
public void checkExpr(int numParts,
String dbName, String tblName, ExprNodeGenericFuncDesc expr) throws Exception {
List<Partition> parts = new ArrayList<Partition>();
- client.listPartitionsByExpr(
- dbName, tblName, Utilities.serializeExpressionToKryo(expr), null, (short)-1, parts);
+ client.listPartitionsByExpr(dbName, tblName,
+ SerializationUtilities.serializeExpressionToKryo(expr), null, (short)-1, parts);
assertEquals("Partition check failed: " + expr.getExprString(), numParts, parts.size());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java
index 1364888..c1667c2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java
@@ -23,8 +23,6 @@ import java.io.File;
import java.util.ArrayList;
import java.util.LinkedHashMap;
-import junit.framework.TestCase;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -37,6 +35,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.mapred.JobConf;
+import junit.framework.TestCase;
+
/**
* TestPlan.
*
@@ -83,7 +83,7 @@ public class TestPlan extends TestCase {
JobConf job = new JobConf(TestPlan.class);
// serialize the configuration once ..
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- Utilities.serializePlan(mrwork, baos, job);
+ SerializationUtilities.serializePlan(mrwork, baos, job);
baos.close();
String v1 = baos.toString();
@@ -101,7 +101,7 @@ public class TestPlan extends TestCase {
// serialize again
baos.reset();
- Utilities.serializePlan(mrwork2, baos, job);
+ SerializationUtilities.serializePlan(mrwork2, baos, job);
baos.close();
// verify that the two are equal
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
index 028cdd1..bb6a4e1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
@@ -26,16 +26,8 @@ import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
-import junit.framework.Assert;
-import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -46,6 +38,14 @@ import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
public class TestUtilities extends TestCase {
public static final Logger LOG = LoggerFactory.getLogger(TestUtilities.class);
@@ -85,8 +85,8 @@ public class TestUtilities extends TestCase {
children.add(constant);
ExprNodeGenericFuncDesc desc = new ExprNodeGenericFuncDesc(TypeInfoFactory.timestampTypeInfo,
new GenericUDFFromUtcTimestamp(), children);
- assertEquals(desc.getExprString(), Utilities.deserializeExpression(
- Utilities.serializeExpression(desc)).getExprString());
+ assertEquals(desc.getExprString(), SerializationUtilities.deserializeExpression(
+ SerializationUtilities.serializeExpression(desc)).getExprString());
}
public void testgetDbTableName() throws HiveException{
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 9f616ab..1ff7eb5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -17,7 +17,13 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import java.io.DataInput;
import java.io.DataOutput;
@@ -51,6 +57,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
@@ -67,7 +74,6 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy;
-import org.apache.hadoop.hive.ql.io.orc.TestOrcRawRecordMerger.MyRow;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -1609,7 +1615,7 @@ public class TestInputOutputFormat {
Path mapXml = new Path(workDir, "map.xml");
localFs.delete(mapXml, true);
FSDataOutputStream planStream = localFs.create(mapXml);
- Utilities.serializePlan(mapWork, planStream, conf);
+ SerializationUtilities.serializePlan(mapWork, planStream, conf);
planStream.close();
return conf;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
index 3560c43..7a93b54 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -121,7 +121,7 @@ public class TestOrcSplitElimination {
childExpr.add(col);
childExpr.add(con);
ExprNodeGenericFuncDesc en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- String sargStr = Utilities.serializeExpression(en);
+ String sargStr = SerializationUtilities.serializeExpression(en);
conf.set("hive.io.filter.expr.serialized", sargStr);
InputSplit[] splits = in.getSplits(conf, 1);
assertEquals(5, splits.length);
@@ -129,7 +129,7 @@ public class TestOrcSplitElimination {
con = new ExprNodeConstantDesc(1);
childExpr.set(1, con);
en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- sargStr = Utilities.serializeExpression(en);
+ sargStr = SerializationUtilities.serializeExpression(en);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
assertEquals(0, splits.length);
@@ -137,7 +137,7 @@ public class TestOrcSplitElimination {
con = new ExprNodeConstantDesc(2);
childExpr.set(1, con);
en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- sargStr = Utilities.serializeExpression(en);
+ sargStr = SerializationUtilities.serializeExpression(en);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
assertEquals(1, splits.length);
@@ -145,7 +145,7 @@ public class TestOrcSplitElimination {
con = new ExprNodeConstantDesc(5);
childExpr.set(1, con);
en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- sargStr = Utilities.serializeExpression(en);
+ sargStr = SerializationUtilities.serializeExpression(en);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
assertEquals(2, splits.length);
@@ -153,7 +153,7 @@ public class TestOrcSplitElimination {
con = new ExprNodeConstantDesc(13);
childExpr.set(1, con);
en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- sargStr = Utilities.serializeExpression(en);
+ sargStr = SerializationUtilities.serializeExpression(en);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
assertEquals(3, splits.length);
@@ -161,7 +161,7 @@ public class TestOrcSplitElimination {
con = new ExprNodeConstantDesc(29);
childExpr.set(1, con);
en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- sargStr = Utilities.serializeExpression(en);
+ sargStr = SerializationUtilities.serializeExpression(en);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
assertEquals(4, splits.length);
@@ -169,7 +169,7 @@ public class TestOrcSplitElimination {
con = new ExprNodeConstantDesc(70);
childExpr.set(1, con);
en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- sargStr = Utilities.serializeExpression(en);
+ sargStr = SerializationUtilities.serializeExpression(en);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
assertEquals(5, splits.length);
@@ -199,7 +199,7 @@ public class TestOrcSplitElimination {
childExpr.add(col);
childExpr.add(con);
ExprNodeGenericFuncDesc en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- String sargStr = Utilities.serializeExpression(en);
+ String sargStr = SerializationUtilities.serializeExpression(en);
conf.set("hive.io.filter.expr.serialized", sargStr);
InputSplit[] splits = in.getSplits(conf, 1);
assertEquals(2, splits.length);
@@ -207,7 +207,7 @@ public class TestOrcSplitElimination {
con = new ExprNodeConstantDesc(0);
childExpr.set(1, con);
en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- sargStr = Utilities.serializeExpression(en);
+ sargStr = SerializationUtilities.serializeExpression(en);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
// no stripes satisfies the condition
@@ -216,7 +216,7 @@ public class TestOrcSplitElimination {
con = new ExprNodeConstantDesc(2);
childExpr.set(1, con);
en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- sargStr = Utilities.serializeExpression(en);
+ sargStr = SerializationUtilities.serializeExpression(en);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
// only first stripe will satisfy condition and hence single split
@@ -225,7 +225,7 @@ public class TestOrcSplitElimination {
con = new ExprNodeConstantDesc(5);
childExpr.set(1, con);
en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- sargStr = Utilities.serializeExpression(en);
+ sargStr = SerializationUtilities.serializeExpression(en);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
// first stripe will satisfy the predicate and will be a single split, last stripe will be a
@@ -235,7 +235,7 @@ public class TestOrcSplitElimination {
con = new ExprNodeConstantDesc(13);
childExpr.set(1, con);
en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- sargStr = Utilities.serializeExpression(en);
+ sargStr = SerializationUtilities.serializeExpression(en);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
// first 2 stripes will satisfy the predicate and merged to single split, last stripe will be a
@@ -245,7 +245,7 @@ public class TestOrcSplitElimination {
con = new ExprNodeConstantDesc(29);
childExpr.set(1, con);
en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- sargStr = Utilities.serializeExpression(en);
+ sargStr = SerializationUtilities.serializeExpression(en);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
// first 3 stripes will satisfy the predicate and merged to single split, last stripe will be a
@@ -255,7 +255,7 @@ public class TestOrcSplitElimination {
con = new ExprNodeConstantDesc(70);
childExpr.set(1, con);
en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- sargStr = Utilities.serializeExpression(en);
+ sargStr = SerializationUtilities.serializeExpression(en);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
// first 2 stripes will satisfy the predicate and merged to single split, last two stripe will
@@ -304,7 +304,7 @@ public class TestOrcSplitElimination {
childExpr2.add(en1);
ExprNodeGenericFuncDesc en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2);
- String sargStr = Utilities.serializeExpression(en2);
+ String sargStr = SerializationUtilities.serializeExpression(en2);
conf.set("hive.io.filter.expr.serialized", sargStr);
InputSplit[] splits = in.getSplits(conf, 1);
assertEquals(2, splits.length);
@@ -321,7 +321,7 @@ public class TestOrcSplitElimination {
childExpr2.set(1, en1);
en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2);
- sargStr = Utilities.serializeExpression(en2);
+ sargStr = SerializationUtilities.serializeExpression(en2);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
// no stripe will satisfy the predicate
@@ -339,7 +339,7 @@ public class TestOrcSplitElimination {
childExpr2.set(1, en1);
en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2);
- sargStr = Utilities.serializeExpression(en2);
+ sargStr = SerializationUtilities.serializeExpression(en2);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
// only first stripe will satisfy condition and hence single split
@@ -358,7 +358,7 @@ public class TestOrcSplitElimination {
childExpr2.set(1, en1);
en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2);
- sargStr = Utilities.serializeExpression(en2);
+ sargStr = SerializationUtilities.serializeExpression(en2);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
// first two stripes will satisfy condition and hence single split
@@ -378,7 +378,7 @@ public class TestOrcSplitElimination {
childExpr2.set(1, en1);
en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2);
- sargStr = Utilities.serializeExpression(en2);
+ sargStr = SerializationUtilities.serializeExpression(en2);
conf.set("hive.io.filter.expr.serialized", sargStr);
splits = in.getSplits(conf, 1);
// only second stripes will satisfy condition and hence single split
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
index 7204521..bf363f3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
@@ -18,12 +18,19 @@
package org.apache.hadoop.hive.ql.io.parquet;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
-import org.apache.hadoop.hive.ql.plan.*;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -34,16 +41,14 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import com.google.common.collect.Lists;
public class TestParquetRowGroupFilter extends AbstractTestParquetDirect {
@@ -96,7 +101,7 @@ public class TestParquetRowGroupFilter extends AbstractTestParquetDirect {
children.add(columnDesc);
children.add(constantDesc);
ExprNodeGenericFuncDesc genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children);
- String searchArgumentStr = Utilities.serializeExpression(genericFuncDesc);
+ String searchArgumentStr = SerializationUtilities.serializeExpression(genericFuncDesc);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr);
ParquetRecordReaderWrapper recordReader = (ParquetRecordReaderWrapper)
@@ -109,7 +114,7 @@ public class TestParquetRowGroupFilter extends AbstractTestParquetDirect {
constantDesc = new ExprNodeConstantDesc(100);
children.set(1, constantDesc);
genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children);
- searchArgumentStr = Utilities.serializeExpression(genericFuncDesc);
+ searchArgumentStr = SerializationUtilities.serializeExpression(genericFuncDesc);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr);
recordReader = (ParquetRecordReaderWrapper)
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
index e72789d..a0fa700 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
@@ -22,23 +22,22 @@ import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertNull;
import static junit.framework.Assert.assertTrue;
-import com.google.common.collect.Sets;
+import java.beans.XMLDecoder;
+import java.io.ByteArrayInputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Set;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.junit.Test;
-import java.beans.XMLDecoder;
-import java.io.ByteArrayInputStream;
-import java.io.UnsupportedEncodingException;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.parquet.filter2.predicate.FilterPredicate;
+import com.google.common.collect.Sets;
/**
* These tests cover the conversion from Hive's AST to SearchArguments.
@@ -2713,7 +2712,7 @@ public class TestConvertAstToSearchArg {
"AAABgj0BRVFVQcwBBW9yZy5hcGFjaGUuaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5Q" +
"EAAAECAQFib29sZWHu";
SearchArgument sarg =
- new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
@@ -2732,7 +2731,7 @@ public class TestConvertAstToSearchArg {
"Y2hlLmhhZG9vcC5oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUH" +
"MAQVvcmcuYXBhY2hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAgEBYm9vbGVh7g==";
SearchArgument sarg =
- new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
@@ -2752,7 +2751,7 @@ public class TestConvertAstToSearchArg {
"oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQZvcmcuYXBhY2" +
"hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABBAEBYm9vbGVh7g==";
SearchArgument sarg =
- new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
@@ -2772,7 +2771,7 @@ public class TestConvertAstToSearchArg {
"vb3AuaGl2ZS5xbC51ZGYuZ2VuZXJpYy5HZW5lcmljVURGT1BFcXVh7AEAAAGCPQFFUVVBzAEGb3JnLm" +
"FwYWNoZS5oYWRvb3AuaW8uQm9vbGVhbldyaXRhYmzlAQAAAQQBAWJvb2xlYe4=";
SearchArgument sarg =
- new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
@@ -2792,7 +2791,7 @@ public class TestConvertAstToSearchArg {
"lLmhhZG9vcC5oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQ" +
"ZvcmcuYXBhY2hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABBAEBYm9vbGVh7g==";
SearchArgument sarg =
- new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
@@ -2811,7 +2810,7 @@ public class TestConvertAstToSearchArg {
"dmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QRXF1YewBAAABgj0BRVFVQcwBBW9yZy5hcGFjaGU" +
"uaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5QEAAAECAQFib29sZWHu";
SearchArgument sarg =
- new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
@@ -2832,7 +2831,7 @@ public class TestConvertAstToSearchArg {
"hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAwkBAgEBYrIAAAgBAwkBB29yZy5hcGFjaGUua" +
"GFkb29wLmhpdmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QQW7kAQEGAQAAAQMJ";
SearchArgument sarg =
- new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("(and leaf-0 leaf-1)", sarg.getExpression().toString());
assertEquals(2, sarg.getLeaves().size());
@@ -2854,7 +2853,7 @@ public class TestConvertAstToSearchArg {
"aXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQVvcmcuYXBhY2h" +
"lLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAgEBYm9vbGVh7g==";
SearchArgument sarg =
- new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
@@ -2873,7 +2872,7 @@ public class TestConvertAstToSearchArg {
"b29wLmhpdmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QRXF1YewBAAABgj0BRVFVQcwBBW9yZy5" +
"hcGFjaGUuaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5QEAAAECAQFib29sZWHu";
SearchArgument sarg =
- new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
[2/2] hive git commit: HIVE-12302: Use KryoPool instead of
thread-local caching (Prasanth Jayachandran reviewed by Ashutosh Chauhan)
Posted by pr...@apache.org.
HIVE-12302: Use KryoPool instead of thread-local caching (Prasanth Jayachandran reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2bb5e63c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2bb5e63c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2bb5e63c
Branch: refs/heads/master
Commit: 2bb5e63c96c58d81d4565e9633accbd5133b33af
Parents: 560e4fe
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Tue Dec 8 13:07:09 2015 -0600
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Tue Dec 8 13:07:09 2015 -0600
----------------------------------------------------------------------
.../predicate/AccumuloPredicateHandler.java | 4 +-
.../predicate/TestAccumuloPredicateHandler.java | 36 +-
.../hive/hbase/HiveHBaseTableInputFormat.java | 11 +-
.../hive/hcatalog/api/HCatClientHMSImpl.java | 26 +-
.../org/apache/hadoop/hive/ql/QTestUtil.java | 3 +-
.../hadoop/hive/ql/exec/MapJoinOperator.java | 6 +-
.../apache/hadoop/hive/ql/exec/PTFUtils.java | 2 +-
.../hive/ql/exec/SerializationUtilities.java | 730 +++++++++++++++++++
.../apache/hadoop/hive/ql/exec/Utilities.java | 695 +-----------------
.../hadoop/hive/ql/exec/mr/ExecDriver.java | 6 +-
.../hadoop/hive/ql/exec/mr/MapRedTask.java | 3 +-
.../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 3 +-
.../persistence/HybridHashTableContainer.java | 29 +-
.../ql/exec/persistence/ObjectContainer.java | 40 +-
.../hive/ql/exec/spark/KryoSerializer.java | 28 +-
.../ql/exec/spark/RemoteHiveSparkClient.java | 1 -
.../hadoop/hive/ql/io/HiveInputFormat.java | 5 +-
.../hive/ql/io/parquet/ProjectionPusher.java | 3 +-
.../hive/ql/io/sarg/ConvertAstToSearchArg.java | 8 +-
.../apache/hadoop/hive/ql/metadata/Hive.java | 61 +-
.../physical/CommonJoinTaskDispatcher.java | 3 +-
.../physical/GenMRSkewJoinProcessor.java | 17 +-
.../physical/GenSparkSkewJoinProcessor.java | 19 +-
.../ql/optimizer/physical/SerializeFilter.java | 24 +-
.../physical/SortMergeJoinTaskDispatcher.java | 5 +-
.../ppr/PartitionExpressionForMetastore.java | 8 +-
.../optimizer/spark/SplitSparkWorkResolver.java | 4 +-
.../hadoop/hive/ql/parse/GenTezUtils.java | 11 +-
.../hive/ql/parse/spark/GenSparkUtils.java | 10 +-
.../hive/ql/parse/spark/SplitOpTreeForDPP.java | 7 +-
.../hive/ql/stats/fs/FSStatsAggregator.java | 14 +-
.../hive/ql/stats/fs/FSStatsPublisher.java | 14 +-
.../hive/metastore/TestMetastoreExpr.java | 10 +-
.../apache/hadoop/hive/ql/exec/TestPlan.java | 8 +-
.../hadoop/hive/ql/exec/TestUtilities.java | 20 +-
.../hive/ql/io/orc/TestInputOutputFormat.java | 12 +-
.../hive/ql/io/orc/TestOrcSplitElimination.java | 40 +-
.../io/parquet/TestParquetRowGroupFilter.java | 27 +-
.../ql/io/sarg/TestConvertAstToSearchArg.java | 35 +-
39 files changed, 1054 insertions(+), 934 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
index 2c0e3c2..d5cc9a5 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.hive.accumulo.predicate.compare.NotEqual;
import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
import org.apache.hadoop.hive.accumulo.predicate.compare.StringCompare;
import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
@@ -358,7 +358,7 @@ public class AccumuloPredicateHandler {
if (filteredExprSerialized == null)
return null;
- return Utilities.deserializeExpression(filteredExprSerialized);
+ return SerializationUtilities.deserializeExpression(filteredExprSerialized);
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
index 15ccda7..88e4530 100644
--- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.hive.accumulo.predicate.compare.StringCompare;
import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
import org.apache.hadoop.hive.accumulo.serde.TooManyAccumuloColumnsException;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -117,7 +117,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPEqual(), children);
assertNotNull(node);
- String filterExpr = Utilities.serializeExpression(node);
+ String filterExpr = SerializationUtilities.serializeExpression(node);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
List<IndexSearchCondition> sConditions = handler.getSearchConditions(conf);
@@ -134,7 +134,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPEqual(), children);
assertNotNull(node);
- String filterExpr = Utilities.serializeExpression(node);
+ String filterExpr = SerializationUtilities.serializeExpression(node);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
Collection<Range> ranges = handler.getRanges(conf, columnMapper);
@@ -157,7 +157,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPGreaterThan(), children);
assertNotNull(node);
- String filterExpr = Utilities.serializeExpression(node);
+ String filterExpr = SerializationUtilities.serializeExpression(node);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
Collection<Range> ranges = handler.getRanges(conf, columnMapper);
@@ -182,7 +182,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPEqualOrGreaterThan(), children);
assertNotNull(node);
- String filterExpr = Utilities.serializeExpression(node);
+ String filterExpr = SerializationUtilities.serializeExpression(node);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
Collection<Range> ranges = handler.getRanges(conf, columnMapper);
@@ -206,7 +206,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPLessThan(), children);
assertNotNull(node);
- String filterExpr = Utilities.serializeExpression(node);
+ String filterExpr = SerializationUtilities.serializeExpression(node);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
Collection<Range> ranges = handler.getRanges(conf, columnMapper);
@@ -231,7 +231,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPEqualOrLessThan(), children);
assertNotNull(node);
- String filterExpr = Utilities.serializeExpression(node);
+ String filterExpr = SerializationUtilities.serializeExpression(node);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
Collection<Range> ranges = handler.getRanges(conf, columnMapper);
@@ -273,7 +273,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPAnd(), bothFilters);
- String filterExpr = Utilities.serializeExpression(both);
+ String filterExpr = SerializationUtilities.serializeExpression(both);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
Collection<Range> ranges = handler.getRanges(conf, columnMapper);
@@ -309,7 +309,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPAnd(), bothFilters);
- String filterExpr = Utilities.serializeExpression(both);
+ String filterExpr = SerializationUtilities.serializeExpression(both);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
List<Range> ranges = handler.getRanges(conf, columnMapper);
@@ -329,7 +329,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPEqual(), children);
assertNotNull(node);
- String filterExpr = Utilities.serializeExpression(node);
+ String filterExpr = SerializationUtilities.serializeExpression(node);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
List<IndexSearchCondition> sConditions = handler.getSearchConditions(conf);
@@ -356,7 +356,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPEqual(), children);
assertNotNull(node);
- String filterExpr = Utilities.serializeExpression(node);
+ String filterExpr = SerializationUtilities.serializeExpression(node);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
List<IndexSearchCondition> sConditions = handler.getSearchConditions(conf);
assertEquals(sConditions.size(), 1);
@@ -375,7 +375,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPNotNull(), children);
assertNotNull(node);
- String filterExpr = Utilities.serializeExpression(node);
+ String filterExpr = SerializationUtilities.serializeExpression(node);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
List<IndexSearchCondition> sConditions = handler.getSearchConditions(conf);
assertEquals(sConditions.size(), 1);
@@ -417,7 +417,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPAnd(), bothFilters);
- String filterExpr = Utilities.serializeExpression(both);
+ String filterExpr = SerializationUtilities.serializeExpression(both);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
try {
List<IteratorSetting> iterators = handler.getIterators(conf, columnMapper);
@@ -468,7 +468,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPAnd(), bothFilters);
- String filterExpr = Utilities.serializeExpression(both);
+ String filterExpr = SerializationUtilities.serializeExpression(both);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
conf.setBoolean(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY, false);
try {
@@ -519,7 +519,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPAnd(), bothFilters);
- String filterExpr = Utilities.serializeExpression(both);
+ String filterExpr = SerializationUtilities.serializeExpression(both);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
List<IteratorSetting> iterators = handler.getIterators(conf, columnMapper);
assertEquals(iterators.size(), 2);
@@ -665,7 +665,7 @@ public class TestAccumuloPredicateHandler {
ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPAnd(), bothFilters);
- String filterExpr = Utilities.serializeExpression(both);
+ String filterExpr = SerializationUtilities.serializeExpression(both);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
// Should make ['f', 'm\0')
@@ -697,7 +697,7 @@ public class TestAccumuloPredicateHandler {
new GenericUDFOPLessThan(), children);
assertNotNull(node);
- String filterExpr = Utilities.serializeExpression(node);
+ String filterExpr = SerializationUtilities.serializeExpression(node);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
// Should make (100, +inf)
@@ -738,7 +738,7 @@ public class TestAccumuloPredicateHandler {
new GenericUDFOPLessThan(), children);
assertNotNull(node);
- String filterExpr = Utilities.serializeExpression(node);
+ String filterExpr = SerializationUtilities.serializeExpression(node);
conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
// Should make (100, +inf)
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
index b17714f..88d1865 100644
--- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
+++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
@@ -20,12 +20,9 @@ package org.apache.hadoop.hive.hbase;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -39,7 +36,7 @@ import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -74,6 +71,8 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* HiveHBaseTableInputFormat implements InputFormat for HBase storage handler
@@ -187,7 +186,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
Scan scan = new Scan();
String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR);
if (filterObjectSerialized != null) {
- HBaseScanRange range = Utilities.deserializeObject(filterObjectSerialized,
+ HBaseScanRange range = SerializationUtilities.deserializeObject(filterObjectSerialized,
HBaseScanRange.class);
try {
range.setup(scan, jobConf);
@@ -203,7 +202,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
}
ExprNodeGenericFuncDesc filterExpr =
- Utilities.deserializeExpression(filterExprSerialized);
+ SerializationUtilities.deserializeExpression(filterExprSerialized);
String keyColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey];
String colType = jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey];
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
index 41571fc..4ab497e 100644
--- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
+++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
@@ -18,9 +18,15 @@
*/
package org.apache.hive.hcatalog.api;
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ObjectPair;
@@ -47,7 +53,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -68,13 +74,9 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
* The HCatClientHMSImpl is the Hive Metastore client based implementation of
@@ -584,7 +586,7 @@ public class HCatClientHMSImpl extends HCatClient {
ExprNodeGenericFuncDesc partitionExpression = new ExpressionBuilder(table, partitionSpec).build();
ObjectPair<Integer, byte[]> serializedPartitionExpression =
new ObjectPair<Integer, byte[]>(partitionSpec.size(),
- Utilities.serializeExpressionToKryo(partitionExpression));
+ SerializationUtilities.serializeExpressionToKryo(partitionExpression));
hmsClient.dropPartitions(table.getDbName(), table.getTableName(), Arrays.asList(serializedPartitionExpression),
deleteData && !isExternal(table), // Delete data?
ifExists, // Fail if table doesn't exist?
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 6713a2f..5b2c8c2 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -57,6 +57,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -1352,7 +1353,7 @@ public class QTestUtil {
try {
conf.set(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "javaXML");
for (Task<? extends Serializable> plan : tasks) {
- Utilities.serializePlan(plan, ofs, conf);
+ SerializationUtilities.serializePlan(plan, ofs, conf);
}
ofs.close();
fixXml4JDK7(outf.getPath());
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index cab0fc8..fbc5ea4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -21,16 +21,12 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -68,6 +64,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hive.common.util.ReflectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Map side Join operator implementation.
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
index 6c11637..721fbaa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
@@ -40,7 +40,7 @@ import java.util.Stack;
import org.antlr.runtime.CommonToken;
import org.antlr.runtime.tree.BaseTree;
import org.antlr.runtime.tree.CommonTree;
-import org.apache.hadoop.hive.ql.exec.Utilities.EnumDelegate;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities.EnumDelegate;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
new file mode 100644
index 0000000..d5e946e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -0,0 +1,730 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import java.beans.DefaultPersistenceDelegate;
+import java.beans.Encoder;
+import java.beans.ExceptionListener;
+import java.beans.Expression;
+import java.beans.PersistenceDelegate;
+import java.beans.Statement;
+import java.beans.XMLDecoder;
+import java.beans.XMLEncoder;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.antlr.runtime.CommonToken;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantMapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantStructObjectInspector;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.pool.KryoFactory;
+import com.esotericsoftware.kryo.pool.KryoPool;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+
+/**
+ * Utilities related to serialization and deserialization.
+ */
+public class SerializationUtilities {
+ private static final String CLASS_NAME = SerializationUtilities.class.getName();
+ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+
+ private static KryoFactory factory = new KryoFactory() {
+ public Kryo create() {
+ Kryo kryo = new Kryo();
+ kryo.register(java.sql.Date.class, new SqlDateSerializer());
+ kryo.register(java.sql.Timestamp.class, new TimestampSerializer());
+ kryo.register(Path.class, new PathSerializer());
+ kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
+ ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
+ .setFallbackInstantiatorStrategy(
+ new StdInstantiatorStrategy());
+ removeField(kryo, Operator.class, "colExprMap");
+ removeField(kryo, AbstractOperatorDesc.class, "statistics");
+ kryo.register(MapWork.class);
+ kryo.register(ReduceWork.class);
+ kryo.register(TableDesc.class);
+ kryo.register(UnionOperator.class);
+ kryo.register(FileSinkOperator.class);
+ kryo.register(HiveIgnoreKeyTextOutputFormat.class);
+ kryo.register(StandardConstantListObjectInspector.class);
+ kryo.register(StandardConstantMapObjectInspector.class);
+ kryo.register(StandardConstantStructObjectInspector.class);
+ kryo.register(SequenceFileInputFormat.class);
+ kryo.register(HiveSequenceFileOutputFormat.class);
+ kryo.register(SparkEdgeProperty.class);
+ kryo.register(SparkWork.class);
+ kryo.register(Pair.class);
+ return kryo;
+ }
+ };
+
+ // Bounded queue could be specified here but that will lead to blocking.
+ // ConcurrentLinkedQueue is unbounded and will release soft referenced kryo instances under
+ // memory pressure.
+ private static KryoPool kryoPool = new KryoPool.Builder(factory).softReferences().build();
+
+ /**
+ * By default, kryo pool uses ConcurrentLinkedQueue which is unbounded. To facilitate reuse of
+ * kryo object call releaseKryo() after done using the kryo instance. The class loader for the
+ * kryo instance will be set to current thread's context class loader.
+ *
+ * @return kryo instance
+ */
+ public static Kryo borrowKryo() {
+ Kryo kryo = kryoPool.borrow();
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ return kryo;
+ }
+
+ /**
+ * Release kryo instance back to the pool.
+ *
+ * @param kryo - kryo instance to be released
+ */
+ public static void releaseKryo(Kryo kryo) {
+ kryoPool.release(kryo);
+ }
+
+ private static void removeField(Kryo kryo, Class type, String fieldName) {
+ FieldSerializer fld = new FieldSerializer(kryo, type);
+ fld.removeField(fieldName);
+ kryo.register(type, fld);
+ }
+
+ /**
+ * Kryo serializer for timestamp.
+ */
+ private static class TimestampSerializer extends
+ com.esotericsoftware.kryo.Serializer<Timestamp> {
+
+ @Override
+ public Timestamp read(Kryo kryo, Input input, Class<Timestamp> clazz) {
+ Timestamp ts = new Timestamp(input.readLong());
+ ts.setNanos(input.readInt());
+ return ts;
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, Timestamp ts) {
+ output.writeLong(ts.getTime());
+ output.writeInt(ts.getNanos());
+ }
+ }
+
+ /**
+ * Custom Kryo serializer for sql date, otherwise Kryo gets confused between
+ * java.sql.Date and java.util.Date while deserializing
+ */
+ private static class SqlDateSerializer extends
+ com.esotericsoftware.kryo.Serializer<java.sql.Date> {
+
+ @Override
+ public java.sql.Date read(Kryo kryo, Input input, Class<java.sql.Date> clazz) {
+ return new java.sql.Date(input.readLong());
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, java.sql.Date sqlDate) {
+ output.writeLong(sqlDate.getTime());
+ }
+ }
+
+ private static class PathSerializer extends com.esotericsoftware.kryo.Serializer<Path> {
+
+ @Override
+ public void write(Kryo kryo, Output output, Path path) {
+ output.writeString(path.toUri().toString());
+ }
+
+ @Override
+ public Path read(Kryo kryo, Input input, Class<Path> type) {
+ return new Path(URI.create(input.readString()));
+ }
+ }
+
+ /**
+ * A kryo {@link Serializer} for lists created via {@link Arrays#asList(Object...)}.
+ * <p>
+ * Note: This serializer does not support cyclic references, so if one of the objects
+ * gets set the list as attribute this might cause an error during deserialization.
+ * </p>
+ * <p/>
+ * This is from kryo-serializers package. Added explicitly to avoid classpath issues.
+ */
+ private static class ArraysAsListSerializer
+ extends com.esotericsoftware.kryo.Serializer<List<?>> {
+
+ private Field _arrayField;
+
+ public ArraysAsListSerializer() {
+ try {
+ _arrayField = Class.forName("java.util.Arrays$ArrayList").getDeclaredField("a");
+ _arrayField.setAccessible(true);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ // Immutable causes #copy(obj) to return the original object
+ setImmutable(true);
+ }
+
+ @Override
+ public List<?> read(final Kryo kryo, final Input input, final Class<List<?>> type) {
+ final int length = input.readInt(true);
+ Class<?> componentType = kryo.readClass(input).getType();
+ if (componentType.isPrimitive()) {
+ componentType = getPrimitiveWrapperClass(componentType);
+ }
+ try {
+ final Object items = Array.newInstance(componentType, length);
+ for (int i = 0; i < length; i++) {
+ Array.set(items, i, kryo.readClassAndObject(input));
+ }
+ return Arrays.asList((Object[]) items);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void write(final Kryo kryo, final Output output, final List<?> obj) {
+ try {
+ final Object[] array = (Object[]) _arrayField.get(obj);
+ output.writeInt(array.length, true);
+ final Class<?> componentType = array.getClass().getComponentType();
+ kryo.writeClass(output, componentType);
+ for (final Object item : array) {
+ kryo.writeClassAndObject(output, item);
+ }
+ } catch (final RuntimeException e) {
+ // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write...
+ // handles SerializationException specifically (resizing the buffer)...
+ throw e;
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Class<?> getPrimitiveWrapperClass(final Class<?> c) {
+ if (c.isPrimitive()) {
+ if (c.equals(Long.TYPE)) {
+ return Long.class;
+ } else if (c.equals(Integer.TYPE)) {
+ return Integer.class;
+ } else if (c.equals(Double.TYPE)) {
+ return Double.class;
+ } else if (c.equals(Float.TYPE)) {
+ return Float.class;
+ } else if (c.equals(Boolean.TYPE)) {
+ return Boolean.class;
+ } else if (c.equals(Character.TYPE)) {
+ return Character.class;
+ } else if (c.equals(Short.TYPE)) {
+ return Short.class;
+ } else if (c.equals(Byte.TYPE)) {
+ return Byte.class;
+ }
+ }
+ return c;
+ }
+ }
+
+
+ /**
+ * Serializes the plan.
+ *
+ * @param plan The plan, such as QueryPlan, MapredWork, etc.
+ * @param out The stream to write to.
+ * @param conf to pick which serialization format is desired.
+ */
+ public static void serializePlan(Object plan, OutputStream out, Configuration conf) {
+ serializePlan(plan, out, conf, false);
+ }
+
+ public static void serializePlan(Kryo kryo, Object plan, OutputStream out, Configuration conf) {
+ serializePlan(kryo, plan, out, conf, false);
+ }
+
+ private static void serializePlan(Object plan, OutputStream out, Configuration conf,
+ boolean cloningPlan) {
+ Kryo kryo = borrowKryo();
+ try {
+ serializePlan(kryo, plan, out, conf, cloningPlan);
+ } finally {
+ releaseKryo(kryo);
+ }
+ }
+
+ private static void serializePlan(Kryo kryo, Object plan, OutputStream out, Configuration conf,
+ boolean cloningPlan) {
+ PerfLogger perfLogger = SessionState.getPerfLogger();
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
+ String serializationType = conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo");
+ LOG.info("Serializing " + plan.getClass().getSimpleName() + " via " + serializationType);
+ if ("javaXML".equalsIgnoreCase(serializationType)) {
+ serializeObjectByJavaXML(plan, out);
+ } else {
+ if (cloningPlan) {
+ serializeObjectByKryo(kryo, plan, out);
+ } else {
+ serializeObjectByKryo(kryo, plan, out);
+ }
+ }
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
+ }
+
+ /**
+ * Deserializes the plan.
+ *
+ * @param in The stream to read from.
+ * @param planClass class of plan
+ * @param conf configuration
+ * @return The plan, such as QueryPlan, MapredWork, etc.
+ */
+ public static <T> T deserializePlan(InputStream in, Class<T> planClass, Configuration conf) {
+ return deserializePlan(in, planClass, conf, false);
+ }
+
+ public static <T> T deserializePlan(Kryo kryo, InputStream in, Class<T> planClass,
+ Configuration conf) {
+ return deserializePlan(kryo, in, planClass, conf, false);
+ }
+
+ private static <T> T deserializePlan(InputStream in, Class<T> planClass, Configuration conf,
+ boolean cloningPlan) {
+ Kryo kryo = borrowKryo();
+ T result = null;
+ try {
+ result = deserializePlan(kryo, in, planClass, conf, cloningPlan);
+ } finally {
+ releaseKryo(kryo);
+ }
+ return result;
+ }
+
+ private static <T> T deserializePlan(Kryo kryo, InputStream in, Class<T> planClass,
+ Configuration conf, boolean cloningPlan) {
+ PerfLogger perfLogger = SessionState.getPerfLogger();
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);
+ T plan;
+ String serializationType = conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo");
+ LOG.info("Deserializing " + planClass.getSimpleName() + " via " + serializationType);
+ if ("javaXML".equalsIgnoreCase(serializationType)) {
+ plan = deserializeObjectByJavaXML(in);
+ } else {
+ if (cloningPlan) {
+ plan = deserializeObjectByKryo(kryo, in, planClass);
+ } else {
+ plan = deserializeObjectByKryo(kryo, in, planClass);
+ }
+ }
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);
+ return plan;
+ }
+
+ /**
+ * Clones using the powers of XML. Do not use unless necessary.
+ * @param plan The plan.
+ * @return The clone.
+ */
+ public static MapredWork clonePlan(MapredWork plan) {
+ // TODO: need proper clone. Meanwhile, let's at least keep this horror in one place
+ PerfLogger perfLogger = SessionState.getPerfLogger();
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ Configuration conf = new HiveConf();
+ serializePlan(plan, baos, conf, true);
+ MapredWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+ MapredWork.class, conf, true);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN);
+ return newPlan;
+ }
+
+ /**
+ * Clones using the powers of XML. Do not use unless necessary.
+ * @param plan The plan.
+ * @return The clone.
+ */
+ public static BaseWork cloneBaseWork(BaseWork plan) {
+ PerfLogger perfLogger = SessionState.getPerfLogger();
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ Configuration conf = new HiveConf();
+ serializePlan(plan, baos, conf, true);
+ BaseWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+ plan.getClass(), conf, true);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN);
+ return newPlan;
+ }
+
+ /**
+ * Serialize the object. This helper function mainly makes sure that enums,
+ * counters, etc are handled properly.
+ */
+ private static void serializeObjectByJavaXML(Object plan, OutputStream out) {
+ XMLEncoder e = new XMLEncoder(out);
+ e.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void exceptionThrown(Exception e) {
+ LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ throw new RuntimeException("Cannot serialize object", e);
+ }
+ });
+ // workaround for java 1.5
+ e.setPersistenceDelegate(PlanUtils.ExpressionTypes.class, new EnumDelegate());
+ e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
+ e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
+ e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
+
+ e.setPersistenceDelegate(org.datanucleus.store.types.backed.Map.class, new MapDelegate());
+ e.setPersistenceDelegate(org.datanucleus.store.types.backed.List.class, new ListDelegate());
+ e.setPersistenceDelegate(CommonToken.class, new CommonTokenDelegate());
+ e.setPersistenceDelegate(Path.class, new PathDelegate());
+
+ e.writeObject(plan);
+ e.close();
+ }
+
+
+ /**
+ * Java 1.5 workaround. From http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5015403
+ */
+ public static class EnumDelegate extends DefaultPersistenceDelegate {
+ @Override
+ protected Expression instantiate(Object oldInstance, Encoder out) {
+ return new Expression(Enum.class, "valueOf", new Object[] {oldInstance.getClass(),
+ ((Enum<?>) oldInstance).name()});
+ }
+
+ @Override
+ protected boolean mutatesTo(Object oldInstance, Object newInstance) {
+ return oldInstance == newInstance;
+ }
+ }
+
+ public static class MapDelegate extends DefaultPersistenceDelegate {
+ @Override
+ protected Expression instantiate(Object oldInstance, Encoder out) {
+ Map oldMap = (Map) oldInstance;
+ HashMap newMap = new HashMap(oldMap);
+ return new Expression(newMap, HashMap.class, "new", new Object[] {});
+ }
+
+ @Override
+ protected boolean mutatesTo(Object oldInstance, Object newInstance) {
+ return false;
+ }
+
+ @Override
+ protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
+ java.util.Collection oldO = (java.util.Collection) oldInstance;
+ java.util.Collection newO = (java.util.Collection) newInstance;
+
+ if (newO.size() != 0) {
+ out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
+ }
+ for (Iterator i = oldO.iterator(); i.hasNext();) {
+ out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
+ }
+ }
+ }
+
+ public static class SetDelegate extends DefaultPersistenceDelegate {
+ @Override
+ protected Expression instantiate(Object oldInstance, Encoder out) {
+ Set oldSet = (Set) oldInstance;
+ HashSet newSet = new HashSet(oldSet);
+ return new Expression(newSet, HashSet.class, "new", new Object[] {});
+ }
+
+ @Override
+ protected boolean mutatesTo(Object oldInstance, Object newInstance) {
+ return false;
+ }
+
+ @Override
+ protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
+ java.util.Collection oldO = (java.util.Collection) oldInstance;
+ java.util.Collection newO = (java.util.Collection) newInstance;
+
+ if (newO.size() != 0) {
+ out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
+ }
+ for (Iterator i = oldO.iterator(); i.hasNext();) {
+ out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
+ }
+ }
+
+ }
+
+ public static class ListDelegate extends DefaultPersistenceDelegate {
+ @Override
+ protected Expression instantiate(Object oldInstance, Encoder out) {
+ List oldList = (List) oldInstance;
+ ArrayList newList = new ArrayList(oldList);
+ return new Expression(newList, ArrayList.class, "new", new Object[] {});
+ }
+
+ @Override
+ protected boolean mutatesTo(Object oldInstance, Object newInstance) {
+ return false;
+ }
+
+ @Override
+ protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
+ java.util.Collection oldO = (java.util.Collection) oldInstance;
+ java.util.Collection newO = (java.util.Collection) newInstance;
+
+ if (newO.size() != 0) {
+ out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
+ }
+ for (Iterator i = oldO.iterator(); i.hasNext();) {
+ out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
+ }
+ }
+
+ }
+
+ /**
+ * DatePersistenceDelegate. Needed to serialize java.util.Date
+ * since it is not serialization friendly.
+ * Also works for java.sql.Date since it derives from java.util.Date.
+ */
+ public static class DatePersistenceDelegate extends PersistenceDelegate {
+
+ @Override
+ protected Expression instantiate(Object oldInstance, Encoder out) {
+ Date dateVal = (Date)oldInstance;
+ Object[] args = { dateVal.getTime() };
+ return new Expression(dateVal, dateVal.getClass(), "new", args);
+ }
+
+ @Override
+ protected boolean mutatesTo(Object oldInstance, Object newInstance) {
+ if (oldInstance == null || newInstance == null) {
+ return false;
+ }
+ return oldInstance.getClass() == newInstance.getClass();
+ }
+ }
+
+ /**
+ * TimestampPersistenceDelegate. Needed to serialize java.sql.Timestamp since
+ * it is not serialization friendly.
+ */
+ public static class TimestampPersistenceDelegate extends DatePersistenceDelegate {
+ @Override
+ protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
+ Timestamp ts = (Timestamp)oldInstance;
+ Object[] args = { ts.getNanos() };
+ Statement stmt = new Statement(oldInstance, "setNanos", args);
+ out.writeStatement(stmt);
+ }
+ }
+
+ /**
+ * Need to serialize org.antlr.runtime.CommonToken
+ */
+ public static class CommonTokenDelegate extends PersistenceDelegate {
+ @Override
+ protected Expression instantiate(Object oldInstance, Encoder out) {
+ CommonToken ct = (CommonToken)oldInstance;
+ Object[] args = {ct.getType(), ct.getText()};
+ return new Expression(ct, ct.getClass(), "new", args);
+ }
+ }
+
+ public static class PathDelegate extends PersistenceDelegate {
+ @Override
+ protected Expression instantiate(Object oldInstance, Encoder out) {
+ Path p = (Path)oldInstance;
+ Object[] args = {p.toString()};
+ return new Expression(p, p.getClass(), "new", args);
+ }
+ }
+
+ /**
+ * @param plan Usually of type MapredWork, MapredLocalWork etc.
+ * @param out stream in which serialized plan is written into
+ */
+ private static void serializeObjectByKryo(Kryo kryo, Object plan, OutputStream out) {
+ Output output = new Output(out);
+ kryo.setClassLoader(Utilities.getSessionSpecifiedClassLoader());
+ kryo.writeObject(output, plan);
+ output.close();
+ }
+
+ /**
+ * De-serialize an object. This helper function mainly makes sure that enums,
+ * counters, etc are handled properly.
+ */
+ @SuppressWarnings("unchecked")
+ private static <T> T deserializeObjectByJavaXML(InputStream in) {
+ XMLDecoder d = null;
+ try {
+ d = new XMLDecoder(in, null, null);
+ return (T) d.readObject();
+ } finally {
+ if (null != d) {
+ d.close();
+ }
+ }
+ }
+
+ private static <T> T deserializeObjectByKryo(Kryo kryo, InputStream in, Class<T> clazz ) {
+ Input inp = new Input(in);
+ kryo.setClassLoader(Utilities.getSessionSpecifiedClassLoader());
+ T t = kryo.readObject(inp,clazz);
+ inp.close();
+ return t;
+ }
+
+ public static List<Operator<?>> cloneOperatorTree(Configuration conf, List<Operator<?>> roots) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ serializePlan(roots, baos, conf, true);
+ @SuppressWarnings("unchecked")
+ List<Operator<?>> result =
+ deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+ roots.getClass(), conf, true);
+ return result;
+ }
+
+ /**
+ * Serializes expression via Kryo.
+ * @param expr Expression.
+ * @return Bytes.
+ */
+ public static byte[] serializeExpressionToKryo(ExprNodeGenericFuncDesc expr) {
+ return serializeObjectToKryo(expr);
+ }
+
+ /**
+ * Deserializes expression from Kryo.
+ * @param bytes Bytes containing the expression.
+ * @return Expression; null if deserialization succeeded, but the result type is incorrect.
+ */
+ public static ExprNodeGenericFuncDesc deserializeExpressionFromKryo(byte[] bytes) {
+ return deserializeObjectFromKryo(bytes, ExprNodeGenericFuncDesc.class);
+ }
+
+ public static String serializeExpression(ExprNodeGenericFuncDesc expr) {
+ try {
+ return new String(Base64.encodeBase64(serializeExpressionToKryo(expr)), "UTF-8");
+ } catch (UnsupportedEncodingException ex) {
+ throw new RuntimeException("UTF-8 support required", ex);
+ }
+ }
+
+ public static ExprNodeGenericFuncDesc deserializeExpression(String s) {
+ byte[] bytes;
+ try {
+ bytes = Base64.decodeBase64(s.getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException ex) {
+ throw new RuntimeException("UTF-8 support required", ex);
+ }
+ return deserializeExpressionFromKryo(bytes);
+ }
+
+ private static byte[] serializeObjectToKryo(Serializable object) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Output output = new Output(baos);
+ Kryo kryo = borrowKryo();
+ try {
+ kryo.writeObject(output, object);
+ } finally {
+ releaseKryo(kryo);
+ }
+ output.close();
+ return baos.toByteArray();
+ }
+
+ private static <T extends Serializable> T deserializeObjectFromKryo(byte[] bytes, Class<T> clazz) {
+ Input inp = new Input(new ByteArrayInputStream(bytes));
+ Kryo kryo = borrowKryo();
+ T func = null;
+ try {
+ func = kryo.readObject(inp, clazz);
+ } finally {
+ releaseKryo(kryo);
+ }
+ inp.close();
+ return func;
+ }
+
+ public static String serializeObject(Serializable expr) {
+ try {
+ return new String(Base64.encodeBase64(serializeObjectToKryo(expr)), "UTF-8");
+ } catch (UnsupportedEncodingException ex) {
+ throw new RuntimeException("UTF-8 support required", ex);
+ }
+ }
+
+ public static <T extends Serializable> T deserializeObject(String s, Class<T> clazz) {
+ try {
+ return deserializeObjectFromKryo(Base64.decodeBase64(s.getBytes("UTF-8")), clazz);
+ } catch (UnsupportedEncodingException ex) {
+ throw new RuntimeException("UTF-8 support required", ex);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index dacb80f..c01994f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -20,12 +20,8 @@ package org.apache.hadoop.hive.ql.exec;
import java.beans.DefaultPersistenceDelegate;
import java.beans.Encoder;
-import java.beans.ExceptionListener;
import java.beans.Expression;
-import java.beans.PersistenceDelegate;
import java.beans.Statement;
-import java.beans.XMLDecoder;
-import java.beans.XMLEncoder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
@@ -36,29 +32,22 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.net.URLDecoder;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLTransientException;
-import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
-import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
@@ -83,12 +72,10 @@ import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
-import org.antlr.runtime.CommonToken;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.WordUtils;
import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -150,22 +137,16 @@ import org.apache.hadoop.hive.ql.metadata.InputEstimator;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
-import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.api.Adjacency;
@@ -181,9 +162,6 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantMapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -209,14 +187,10 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Shell;
import org.apache.hive.common.util.ReflectionUtil;
-import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.google.common.base.Preconditions;
/**
@@ -391,6 +365,7 @@ public final class Utilities {
private static BaseWork getBaseWork(Configuration conf, String name) {
Path path = null;
InputStream in = null;
+ Kryo kryo = SerializationUtilities.borrowKryo();
try {
String engine = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE);
if (engine.equals("spark")) {
@@ -401,7 +376,7 @@ public final class Utilities {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
ClassLoader newLoader = addToClassPath(loader, addedJars.split(";"));
Thread.currentThread().setContextClassLoader(newLoader);
- runtimeSerializationKryo.get().setClassLoader(newLoader);
+ kryo.setClassLoader(newLoader);
}
}
@@ -410,16 +385,7 @@ public final class Utilities {
assert path != null;
BaseWork gWork = gWorkMap.get(conf).get(path);
if (gWork == null) {
- Path localPath;
- if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) {
- localPath = new Path(name);
- } else if (ShimLoader.getHadoopShims().isLocalMode(conf)) {
- localPath = path;
- } else {
- LOG.debug("***************non-local mode***************");
- localPath = new Path(name);
- }
- localPath = path;
+ Path localPath = path;
LOG.debug("local path = " + localPath);
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) {
LOG.debug("Loading plan from string: "+path.toUri().getPath());
@@ -438,29 +404,29 @@ public final class Utilities {
if(MAP_PLAN_NAME.equals(name)){
if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){
- gWork = deserializePlan(in, MapWork.class, conf);
+ gWork = SerializationUtilities.deserializePlan(kryo, in, MapWork.class, conf);
} else if(MergeFileMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
- gWork = deserializePlan(in, MergeFileWork.class, conf);
+ gWork = SerializationUtilities.deserializePlan(kryo, in, MergeFileWork.class, conf);
} else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
- gWork = deserializePlan(in, ColumnTruncateWork.class, conf);
+ gWork = SerializationUtilities.deserializePlan(kryo, in, ColumnTruncateWork.class, conf);
} else if(PartialScanMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
- gWork = deserializePlan(in, PartialScanWork.class,conf);
+ gWork = SerializationUtilities.deserializePlan(kryo, in, PartialScanWork.class,conf);
} else {
throw new RuntimeException("unable to determine work from configuration ."
+ MAPRED_MAPPER_CLASS + " was "+ conf.get(MAPRED_MAPPER_CLASS)) ;
}
} else if (REDUCE_PLAN_NAME.equals(name)) {
if(ExecReducer.class.getName().equals(conf.get(MAPRED_REDUCER_CLASS))) {
- gWork = deserializePlan(in, ReduceWork.class, conf);
+ gWork = SerializationUtilities.deserializePlan(kryo, in, ReduceWork.class, conf);
} else {
throw new RuntimeException("unable to determine work from configuration ."
+ MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ;
}
} else if (name.contains(MERGE_PLAN_NAME)) {
if (name.startsWith(MAPNAME)) {
- gWork = deserializePlan(in, MapWork.class, conf);
+ gWork = SerializationUtilities.deserializePlan(kryo, in, MapWork.class, conf);
} else if (name.startsWith(REDUCENAME)) {
- gWork = deserializePlan(in, ReduceWork.class, conf);
+ gWork = SerializationUtilities.deserializePlan(kryo, in, ReduceWork.class, conf);
} else {
throw new RuntimeException("Unknown work type: " + name);
}
@@ -480,6 +446,7 @@ public final class Utilities {
LOG.error(msg, e);
throw new RuntimeException(msg, e);
} finally {
+ SerializationUtilities.releaseKryo(kryo);
if (in != null) {
try {
in.close();
@@ -523,163 +490,6 @@ public final class Utilities {
return ret;
}
- /**
- * Java 1.5 workaround. From http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5015403
- */
- public static class EnumDelegate extends DefaultPersistenceDelegate {
- @Override
- protected Expression instantiate(Object oldInstance, Encoder out) {
- return new Expression(Enum.class, "valueOf", new Object[] {oldInstance.getClass(),
- ((Enum<?>) oldInstance).name()});
- }
-
- @Override
- protected boolean mutatesTo(Object oldInstance, Object newInstance) {
- return oldInstance == newInstance;
- }
- }
-
- public static class MapDelegate extends DefaultPersistenceDelegate {
- @Override
- protected Expression instantiate(Object oldInstance, Encoder out) {
- Map oldMap = (Map) oldInstance;
- HashMap newMap = new HashMap(oldMap);
- return new Expression(newMap, HashMap.class, "new", new Object[] {});
- }
-
- @Override
- protected boolean mutatesTo(Object oldInstance, Object newInstance) {
- return false;
- }
-
- @Override
- protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
- java.util.Collection oldO = (java.util.Collection) oldInstance;
- java.util.Collection newO = (java.util.Collection) newInstance;
-
- if (newO.size() != 0) {
- out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
- }
- for (Iterator i = oldO.iterator(); i.hasNext();) {
- out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
- }
- }
- }
-
- public static class SetDelegate extends DefaultPersistenceDelegate {
- @Override
- protected Expression instantiate(Object oldInstance, Encoder out) {
- Set oldSet = (Set) oldInstance;
- HashSet newSet = new HashSet(oldSet);
- return new Expression(newSet, HashSet.class, "new", new Object[] {});
- }
-
- @Override
- protected boolean mutatesTo(Object oldInstance, Object newInstance) {
- return false;
- }
-
- @Override
- protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
- java.util.Collection oldO = (java.util.Collection) oldInstance;
- java.util.Collection newO = (java.util.Collection) newInstance;
-
- if (newO.size() != 0) {
- out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
- }
- for (Iterator i = oldO.iterator(); i.hasNext();) {
- out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
- }
- }
-
- }
-
- public static class ListDelegate extends DefaultPersistenceDelegate {
- @Override
- protected Expression instantiate(Object oldInstance, Encoder out) {
- List oldList = (List) oldInstance;
- ArrayList newList = new ArrayList(oldList);
- return new Expression(newList, ArrayList.class, "new", new Object[] {});
- }
-
- @Override
- protected boolean mutatesTo(Object oldInstance, Object newInstance) {
- return false;
- }
-
- @Override
- protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
- java.util.Collection oldO = (java.util.Collection) oldInstance;
- java.util.Collection newO = (java.util.Collection) newInstance;
-
- if (newO.size() != 0) {
- out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
- }
- for (Iterator i = oldO.iterator(); i.hasNext();) {
- out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
- }
- }
-
- }
-
- /**
- * DatePersistenceDelegate. Needed to serialize java.util.Date
- * since it is not serialization friendly.
- * Also works for java.sql.Date since it derives from java.util.Date.
- */
- public static class DatePersistenceDelegate extends PersistenceDelegate {
-
- @Override
- protected Expression instantiate(Object oldInstance, Encoder out) {
- Date dateVal = (Date)oldInstance;
- Object[] args = { dateVal.getTime() };
- return new Expression(dateVal, dateVal.getClass(), "new", args);
- }
-
- @Override
- protected boolean mutatesTo(Object oldInstance, Object newInstance) {
- if (oldInstance == null || newInstance == null) {
- return false;
- }
- return oldInstance.getClass() == newInstance.getClass();
- }
- }
-
- /**
- * TimestampPersistenceDelegate. Needed to serialize java.sql.Timestamp since
- * it is not serialization friendly.
- */
- public static class TimestampPersistenceDelegate extends DatePersistenceDelegate {
- @Override
- protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
- Timestamp ts = (Timestamp)oldInstance;
- Object[] args = { ts.getNanos() };
- Statement stmt = new Statement(oldInstance, "setNanos", args);
- out.writeStatement(stmt);
- }
- }
-
- /**
- * Need to serialize org.antlr.runtime.CommonToken
- */
- public static class CommonTokenDelegate extends PersistenceDelegate {
- @Override
- protected Expression instantiate(Object oldInstance, Encoder out) {
- CommonToken ct = (CommonToken)oldInstance;
- Object[] args = {ct.getType(), ct.getText()};
- return new Expression(ct, ct.getClass(), "new", args);
- }
- }
-
- public static class PathDelegate extends PersistenceDelegate {
- @Override
- protected Expression instantiate(Object oldInstance, Encoder out) {
- Path p = (Path)oldInstance;
- Object[] args = {p.toString()};
- return new Expression(p, p.getClass(), "new", args);
- }
- }
-
public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) {
String useName = conf.get(INPUT_NAME);
if (useName == null) {
@@ -702,6 +512,7 @@ public final class Utilities {
}
private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, String name, boolean useCache) {
+ Kryo kryo = SerializationUtilities.borrowKryo();
try {
setPlanPath(conf, hiveScratchDir);
@@ -714,7 +525,7 @@ public final class Utilities {
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
try {
out = new DeflaterOutputStream(byteOut, new Deflater(Deflater.BEST_SPEED));
- serializePlan(w, out, conf);
+ SerializationUtilities.serializePlan(kryo, w, out, conf);
out.close();
out = null;
} finally {
@@ -728,7 +539,7 @@ public final class Utilities {
FileSystem fs = planPath.getFileSystem(conf);
try {
out = fs.create(planPath);
- serializePlan(w, out, conf);
+ SerializationUtilities.serializePlan(kryo, w, out, conf);
out.close();
out = null;
} finally {
@@ -760,6 +571,8 @@ public final class Utilities {
String msg = "Error caching " + name + ": " + e;
LOG.error(msg, e);
throw new RuntimeException(msg, e);
+ } finally {
+ SerializationUtilities.releaseKryo(kryo);
}
}
@@ -790,73 +603,6 @@ public final class Utilities {
return null;
}
- /**
- * Serializes expression via Kryo.
- * @param expr Expression.
- * @return Bytes.
- */
- public static byte[] serializeExpressionToKryo(ExprNodeGenericFuncDesc expr) {
- return serializeObjectToKryo(expr);
- }
-
- /**
- * Deserializes expression from Kryo.
- * @param bytes Bytes containing the expression.
- * @return Expression; null if deserialization succeeded, but the result type is incorrect.
- */
- public static ExprNodeGenericFuncDesc deserializeExpressionFromKryo(byte[] bytes) {
- return deserializeObjectFromKryo(bytes, ExprNodeGenericFuncDesc.class);
- }
-
- public static String serializeExpression(ExprNodeGenericFuncDesc expr) {
- try {
- return new String(Base64.encodeBase64(serializeExpressionToKryo(expr)), "UTF-8");
- } catch (UnsupportedEncodingException ex) {
- throw new RuntimeException("UTF-8 support required", ex);
- }
- }
-
- public static ExprNodeGenericFuncDesc deserializeExpression(String s) {
- byte[] bytes;
- try {
- bytes = Base64.decodeBase64(s.getBytes("UTF-8"));
- } catch (UnsupportedEncodingException ex) {
- throw new RuntimeException("UTF-8 support required", ex);
- }
- return deserializeExpressionFromKryo(bytes);
- }
-
- private static byte[] serializeObjectToKryo(Serializable object) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- Output output = new Output(baos);
- runtimeSerializationKryo.get().writeObject(output, object);
- output.close();
- return baos.toByteArray();
- }
-
- private static <T extends Serializable> T deserializeObjectFromKryo(byte[] bytes, Class<T> clazz) {
- Input inp = new Input(new ByteArrayInputStream(bytes));
- T func = runtimeSerializationKryo.get().readObject(inp, clazz);
- inp.close();
- return func;
- }
-
- public static String serializeObject(Serializable expr) {
- try {
- return new String(Base64.encodeBase64(serializeObjectToKryo(expr)), "UTF-8");
- } catch (UnsupportedEncodingException ex) {
- throw new RuntimeException("UTF-8 support required", ex);
- }
- }
-
- public static <T extends Serializable> T deserializeObject(String s, Class<T> clazz) {
- try {
- return deserializeObjectFromKryo(Base64.decodeBase64(s.getBytes("UTF-8")), clazz);
- } catch (UnsupportedEncodingException ex) {
- throw new RuntimeException("UTF-8 support required", ex);
- }
- }
-
public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate {
@Override
protected Expression instantiate(Object oldInstance, Encoder out) {
@@ -872,415 +618,6 @@ public final class Utilities {
}
}
- /**
- * Kryo serializer for timestamp.
- */
- private static class TimestampSerializer extends
- com.esotericsoftware.kryo.Serializer<Timestamp> {
-
- @Override
- public Timestamp read(Kryo kryo, Input input, Class<Timestamp> clazz) {
- Timestamp ts = new Timestamp(input.readLong());
- ts.setNanos(input.readInt());
- return ts;
- }
-
- @Override
- public void write(Kryo kryo, Output output, Timestamp ts) {
- output.writeLong(ts.getTime());
- output.writeInt(ts.getNanos());
- }
- }
-
- /** Custom Kryo serializer for sql date, otherwise Kryo gets confused between
- java.sql.Date and java.util.Date while deserializing
- */
- private static class SqlDateSerializer extends
- com.esotericsoftware.kryo.Serializer<java.sql.Date> {
-
- @Override
- public java.sql.Date read(Kryo kryo, Input input, Class<java.sql.Date> clazz) {
- return new java.sql.Date(input.readLong());
- }
-
- @Override
- public void write(Kryo kryo, Output output, java.sql.Date sqlDate) {
- output.writeLong(sqlDate.getTime());
- }
- }
-
- private static class CommonTokenSerializer extends com.esotericsoftware.kryo.Serializer<CommonToken> {
- @Override
- public CommonToken read(Kryo kryo, Input input, Class<CommonToken> clazz) {
- return new CommonToken(input.readInt(), input.readString());
- }
-
- @Override
- public void write(Kryo kryo, Output output, CommonToken token) {
- output.writeInt(token.getType());
- output.writeString(token.getText());
- }
- }
-
- private static class PathSerializer extends com.esotericsoftware.kryo.Serializer<Path> {
-
- @Override
- public void write(Kryo kryo, Output output, Path path) {
- output.writeString(path.toUri().toString());
- }
-
- @Override
- public Path read(Kryo kryo, Input input, Class<Path> type) {
- return new Path(URI.create(input.readString()));
- }
- }
-
- public static List<Operator<?>> cloneOperatorTree(Configuration conf, List<Operator<?>> roots) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- serializePlan(roots, baos, conf, true);
- @SuppressWarnings("unchecked")
- List<Operator<?>> result =
- deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
- roots.getClass(), conf, true);
- return result;
- }
-
- private static void serializePlan(Object plan, OutputStream out, Configuration conf, boolean cloningPlan) {
- PerfLogger perfLogger = SessionState.getPerfLogger();
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
- String serializationType = conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo");
- LOG.info("Serializing " + plan.getClass().getSimpleName() + " via " + serializationType);
- if("javaXML".equalsIgnoreCase(serializationType)) {
- serializeObjectByJavaXML(plan, out);
- } else {
- if(cloningPlan) {
- serializeObjectByKryo(cloningQueryPlanKryo.get(), plan, out);
- } else {
- serializeObjectByKryo(runtimeSerializationKryo.get(), plan, out);
- }
- }
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
- }
- /**
- * Serializes the plan.
- * @param plan The plan, such as QueryPlan, MapredWork, etc.
- * @param out The stream to write to.
- * @param conf to pick which serialization format is desired.
- */
- public static void serializePlan(Object plan, OutputStream out, Configuration conf) {
- serializePlan(plan, out, conf, false);
- }
-
- private static <T> T deserializePlan(InputStream in, Class<T> planClass, Configuration conf, boolean cloningPlan) {
- PerfLogger perfLogger = SessionState.getPerfLogger();
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);
- T plan;
- String serializationType = conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo");
- LOG.info("Deserializing " + planClass.getSimpleName() + " via " + serializationType);
- if("javaXML".equalsIgnoreCase(serializationType)) {
- plan = deserializeObjectByJavaXML(in);
- } else {
- if(cloningPlan) {
- plan = deserializeObjectByKryo(cloningQueryPlanKryo.get(), in, planClass);
- } else {
- plan = deserializeObjectByKryo(runtimeSerializationKryo.get(), in, planClass);
- }
- }
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);
- return plan;
- }
- /**
- * Deserializes the plan.
- * @param in The stream to read from.
- * @param planClass class of plan
- * @param conf configuration
- * @return The plan, such as QueryPlan, MapredWork, etc.
- */
- public static <T> T deserializePlan(InputStream in, Class<T> planClass, Configuration conf) {
- return deserializePlan(in, planClass, conf, false);
- }
-
- /**
- * Clones using the powers of XML. Do not use unless necessary.
- * @param plan The plan.
- * @return The clone.
- */
- public static MapredWork clonePlan(MapredWork plan) {
- // TODO: need proper clone. Meanwhile, let's at least keep this horror in one place
- PerfLogger perfLogger = SessionState.getPerfLogger();
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN);
- ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- Configuration conf = new HiveConf();
- serializePlan(plan, baos, conf, true);
- MapredWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
- MapredWork.class, conf, true);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN);
- return newPlan;
- }
-
- /**
- * Clones using the powers of XML. Do not use unless necessary.
- * @param plan The plan.
- * @return The clone.
- */
- public static BaseWork cloneBaseWork(BaseWork plan) {
- PerfLogger perfLogger = SessionState.getPerfLogger();
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN);
- ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- Configuration conf = new HiveConf();
- serializePlan(plan, baos, conf, true);
- BaseWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
- plan.getClass(), conf, true);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN);
- return newPlan;
- }
-
- /**
- * Serialize the object. This helper function mainly makes sure that enums,
- * counters, etc are handled properly.
- */
- private static void serializeObjectByJavaXML(Object plan, OutputStream out) {
- XMLEncoder e = new XMLEncoder(out);
- e.setExceptionListener(new ExceptionListener() {
- @Override
- public void exceptionThrown(Exception e) {
- LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e));
- throw new RuntimeException("Cannot serialize object", e);
- }
- });
- // workaround for java 1.5
- e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
- e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
- e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
- e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
-
- e.setPersistenceDelegate(org.datanucleus.store.types.backed.Map.class, new MapDelegate());
- e.setPersistenceDelegate(org.datanucleus.store.types.backed.List.class, new ListDelegate());
- e.setPersistenceDelegate(CommonToken.class, new CommonTokenDelegate());
- e.setPersistenceDelegate(Path.class, new PathDelegate());
-
- e.writeObject(plan);
- e.close();
- }
-
- /**
- * @param plan Usually of type MapredWork, MapredLocalWork etc.
- * @param out stream in which serialized plan is written into
- */
- private static void serializeObjectByKryo(Kryo kryo, Object plan, OutputStream out) {
- Output output = new Output(out);
- kryo.setClassLoader(getSessionSpecifiedClassLoader());
- kryo.writeObject(output, plan);
- output.close();
- }
-
- /**
- * De-serialize an object. This helper function mainly makes sure that enums,
- * counters, etc are handled properly.
- */
- @SuppressWarnings("unchecked")
- private static <T> T deserializeObjectByJavaXML(InputStream in) {
- XMLDecoder d = null;
- try {
- d = new XMLDecoder(in, null, null);
- return (T) d.readObject();
- } finally {
- if (null != d) {
- d.close();
- }
- }
- }
-
- private static <T> T deserializeObjectByKryo(Kryo kryo, InputStream in, Class<T> clazz ) {
- Input inp = new Input(in);
- kryo.setClassLoader(getSessionSpecifiedClassLoader());
- T t = kryo.readObject(inp,clazz);
- inp.close();
- return t;
- }
-
- // Kryo is not thread-safe,
- // Also new Kryo() is expensive, so we want to do it just once.
- public static ThreadLocal<Kryo>
- runtimeSerializationKryo = new ThreadLocal<Kryo>() {
- @Override
- protected Kryo initialValue() {
- Kryo kryo = new Kryo();
- kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
- kryo.register(java.sql.Date.class, new SqlDateSerializer());
- kryo.register(java.sql.Timestamp.class, new TimestampSerializer());
- kryo.register(Path.class, new PathSerializer());
- kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() );
- ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(
- new StdInstantiatorStrategy());
- removeField(kryo, Operator.class, "colExprMap");
- removeField(kryo, AbstractOperatorDesc.class, "statistics");
- kryo.register(MapWork.class);
- kryo.register(ReduceWork.class);
- kryo.register(TableDesc.class);
- kryo.register(UnionOperator.class);
- kryo.register(FileSinkOperator.class);
- kryo.register(HiveIgnoreKeyTextOutputFormat.class);
- kryo.register(StandardConstantListObjectInspector.class);
- kryo.register(StandardConstantMapObjectInspector.class);
- kryo.register(StandardConstantStructObjectInspector.class);
- kryo.register(SequenceFileInputFormat.class);
- kryo.register(HiveSequenceFileOutputFormat.class);
- return kryo;
- };
- };
- @SuppressWarnings("rawtypes")
- protected static void removeField(Kryo kryo, Class type, String fieldName) {
- FieldSerializer fld = new FieldSerializer(kryo, type);
- fld.removeField(fieldName);
- kryo.register(type, fld);
- }
-
- public static ThreadLocal<Kryo> sparkSerializationKryo = new ThreadLocal<Kryo>() {
- @Override
- protected synchronized Kryo initialValue() {
- Kryo kryo = new Kryo();
- kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
- kryo.register(java.sql.Date.class, new SqlDateSerializer());
- kryo.register(java.sql.Timestamp.class, new TimestampSerializer());
- kryo.register(Path.class, new PathSerializer());
- kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() );
- ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
- removeField(kryo, Operator.class, "colExprMap");
- removeField(kryo, ColumnInfo.class, "objectInspector");
- removeField(kryo, AbstractOperatorDesc.class, "statistics");
- kryo.register(SparkEdgeProperty.class);
- kryo.register(MapWork.class);
- kryo.register(ReduceWork.class);
- kryo.register(SparkWork.class);
- kryo.register(TableDesc.class);
- kryo.register(Pair.class);
- kryo.register(UnionOperator.class);
- kryo.register(FileSinkOperator.class);
- kryo.register(HiveIgnoreKeyTextOutputFormat.class);
- kryo.register(StandardConstantListObjectInspector.class);
- kryo.register(StandardConstantMapObjectInspector.class);
- kryo.register(StandardConstantStructObjectInspector.class);
- kryo.register(SequenceFileInputFormat.class);
- kryo.register(HiveSequenceFileOutputFormat.class);
- return kryo;
- };
- };
-
- private static ThreadLocal<Kryo> cloningQueryPlanKryo = new ThreadLocal<Kryo>() {
- @Override
- protected Kryo initialValue() {
- Kryo kryo = new Kryo();
- kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
- kryo.register(CommonToken.class, new CommonTokenSerializer());
- kryo.register(java.sql.Date.class, new SqlDateSerializer());
- kryo.register(java.sql.Timestamp.class, new TimestampSerializer());
- kryo.register(Path.class, new PathSerializer());
- kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() );
- ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(
- new StdInstantiatorStrategy());
- removeField(kryo, Operator.class, "colExprMap");
- removeField(kryo, AbstractOperatorDesc.class, "statistics");
- kryo.register(MapWork.class);
- kryo.register(ReduceWork.class);
- kryo.register(TableDesc.class);
- kryo.register(UnionOperator.class);
- kryo.register(FileSinkOperator.class);
- kryo.register(HiveIgnoreKeyTextOutputFormat.class);
- kryo.register(StandardConstantListObjectInspector.class);
- kryo.register(StandardConstantMapObjectInspector.class);
- kryo.register(StandardConstantStructObjectInspector.class);
- kryo.register(SequenceFileInputFormat.class);
- kryo.register(HiveSequenceFileOutputFormat.class);
- return kryo;
- };
- };
-
- /**
- * A kryo {@link Serializer} for lists created via {@link Arrays#asList(Object...)}.
- * <p>
- * Note: This serializer does not support cyclic references, so if one of the objects
- * gets set the list as attribute this might cause an error during deserialization.
- * </p>
- *
- * This is from kryo-serializers package. Added explicitly to avoid classpath issues.
- */
- private static class ArraysAsListSerializer extends com.esotericsoftware.kryo.Serializer<List<?>> {
-
- private Field _arrayField;
-
- public ArraysAsListSerializer() {
- try {
- _arrayField = Class.forName( "java.util.Arrays$ArrayList" ).getDeclaredField( "a" );
- _arrayField.setAccessible( true );
- } catch ( final Exception e ) {
- throw new RuntimeException( e );
- }
- // Immutable causes #copy(obj) to return the original object
- setImmutable(true);
- }
-
- @Override
- public List<?> read(final Kryo kryo, final Input input, final Class<List<?>> type) {
- final int length = input.readInt(true);
- Class<?> componentType = kryo.readClass( input ).getType();
- if (componentType.isPrimitive()) {
- componentType = getPrimitiveWrapperClass(componentType);
- }
- try {
- final Object items = Array.newInstance( componentType, length );
- for( int i = 0; i < length; i++ ) {
- Array.set(items, i, kryo.readClassAndObject( input ));
- }
- return Arrays.asList( (Object[])items );
- } catch ( final Exception e ) {
- throw new RuntimeException( e );
- }
- }
-
- @Override
- public void write(final Kryo kryo, final Output output, final List<?> obj) {
- try {
- final Object[] array = (Object[]) _arrayField.get( obj );
- output.writeInt(array.length, true);
- final Class<?> componentType = array.getClass().getComponentType();
- kryo.writeClass( output, componentType );
- for( final Object item : array ) {
- kryo.writeClassAndObject( output, item );
- }
- } catch ( final RuntimeException e ) {
- // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write...
- // handles SerializationException specifically (resizing the buffer)...
- throw e;
- } catch ( final Exception e ) {
- throw new RuntimeException( e );
- }
- }
-
- private Class<?> getPrimitiveWrapperClass(final Class<?> c) {
- if (c.isPrimitive()) {
- if (c.equals(Long.TYPE)) {
- return Long.class;
- } else if (c.equals(Integer.TYPE)) {
- return Integer.class;
- } else if (c.equals(Double.TYPE)) {
- return Double.class;
- } else if (c.equals(Float.TYPE)) {
- return Float.class;
- } else if (c.equals(Boolean.TYPE)) {
- return Boolean.class;
- } else if (c.equals(Character.TYPE)) {
- return Character.class;
- } else if (c.equals(Short.TYPE)) {
- return Short.class;
- } else if (c.equals(Byte.TYPE)) {
- return Byte.class;
- }
- }
- return c;
- }
- }
-
public static TableDesc defaultTd;
static {
// by default we expect ^A separated strings
http://git-wip-us.apache.org/repos/asf/hive/blob/2bb5e63c/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index df96d8c..2129bda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Properties;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -740,12 +741,13 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
int ret;
if (localtask) {
memoryMXBean = ManagementFactory.getMemoryMXBean();
- MapredLocalWork plan = Utilities.deserializePlan(pathData, MapredLocalWork.class, conf);
+ MapredLocalWork plan = SerializationUtilities.deserializePlan(pathData, MapredLocalWork.class,
+ conf);
MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent);
ret = ed.executeInProcess(new DriverContext());
} else {
- MapredWork plan = Utilities.deserializePlan(pathData, MapredWork.class, conf);
+ MapredWork plan = SerializationUtilities.deserializePlan(pathData, MapredWork.class, conf);
ExecDriver ed = new ExecDriver(plan, conf, isSilent);
ret = ed.execute(new DriverContext());
}