You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/08/02 05:49:16 UTC
[3/4] hive git commit: HIVE-11182: Enable optimized hash tables for
spark [Spark Branch] (Rui reviewed by Xuefu)
HIVE-11182: Enable optimized hash tables for spark [Spark Branch] (Rui reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/26eb94fc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/26eb94fc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/26eb94fc
Branch: refs/heads/branch-1
Commit: 26eb94fcb7763fe437b6033776ec8593dfc4a69f
Parents: b4aae73
Author: Rui Li <ru...@intel.com>
Authored: Thu Jul 9 09:58:15 2015 +0800
Committer: xzhang <xz...@xzdt>
Committed: Sat Aug 1 20:35:02 2015 -0700
----------------------------------------------------------------------
.../hive/ql/exec/HashTableSinkOperator.java | 6 +-
.../persistence/MapJoinTableContainerSerDe.java | 63 +++++++++++++++-----
.../hive/ql/exec/spark/HashTableLoader.java | 26 +++++++-
3 files changed, 76 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/26eb94fc/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
index 96283cd..63d4989 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
@@ -269,9 +269,9 @@ public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc> i
public void closeOp(boolean abort) throws HiveException {
try {
if (mapJoinTables == null) {
- if (isLogDebugEnabled) {
- LOG.debug("mapJoinTables is null");
- }
+ if (isLogDebugEnabled) {
+ LOG.debug("mapJoinTables is null");
+ }
} else {
flushToFile();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/26eb94fc/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
index 92625f2..e97a9f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
@@ -26,10 +26,12 @@ import java.lang.reflect.Constructor;
import java.util.ConcurrentModificationException;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -97,11 +99,12 @@ public class MapJoinTableContainerSerDe {
* Loads the table container from a folder. Only used on Spark path.
* @param fs FileSystem of the folder.
* @param folder The folder to load table container.
+ * @param hconf The hive configuration
* @return Loaded table.
*/
@SuppressWarnings("unchecked")
- public MapJoinPersistableTableContainer load(
- FileSystem fs, Path folder) throws HiveException {
+ public MapJoinTableContainer load(
+ FileSystem fs, Path folder, Configuration hconf) throws HiveException {
try {
if (!fs.isDirectory(folder)) {
throw new HiveException("Error, not a directory: " + folder);
@@ -116,7 +119,10 @@ public class MapJoinTableContainerSerDe {
Writable keyContainer = keySerDe.getSerializedClass().newInstance();
Writable valueContainer = valueSerDe.getSerializedClass().newInstance();
- MapJoinPersistableTableContainer tableContainer = null;
+ MapJoinTableContainer tableContainer = null;
+
+ boolean useOptimizedContainer = HiveConf.getBoolVar(
+ hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE);
for (FileStatus fileStatus: fileStatuses) {
Path filePath = fileStatus.getPath();
@@ -131,18 +137,16 @@ public class MapJoinTableContainerSerDe {
String name = in.readUTF();
Map<String, String> metaData = (Map<String, String>) in.readObject();
if (tableContainer == null) {
- tableContainer = create(name, metaData);
+ tableContainer = useOptimizedContainer ?
+ new MapJoinBytesTableContainer(hconf, valueContext, -1, 0) :
+ create(name, metaData);
}
- int numKeys = in.readInt();
- for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
- MapJoinKeyObject key = new MapJoinKeyObject();
- key.read(keyContext, in, keyContainer);
- if (tableContainer.get(key) == null) {
- tableContainer.put(key, new MapJoinEagerRowContainer());
- }
- MapJoinEagerRowContainer values = (MapJoinEagerRowContainer) tableContainer.get(key);
- values.read(valueContext, in, valueContainer);
- tableContainer.put(key, values);
+ if (useOptimizedContainer) {
+ loadOptimized((MapJoinBytesTableContainer) tableContainer,
+ in, keyContainer, valueContainer);
+ } else {
+ loadNormal((MapJoinPersistableTableContainer) tableContainer,
+ in, keyContainer, valueContainer);
}
} finally {
if (in != null) {
@@ -152,6 +156,9 @@ public class MapJoinTableContainerSerDe {
}
}
}
+ if (tableContainer != null) {
+ tableContainer.seal();
+ }
return tableContainer;
} catch (IOException e) {
throw new HiveException("IO error while trying to create table container", e);
@@ -160,6 +167,34 @@ public class MapJoinTableContainerSerDe {
}
}
+ private void loadNormal(MapJoinPersistableTableContainer container,
+ ObjectInputStream in, Writable keyContainer, Writable valueContainer) throws Exception {
+ int numKeys = in.readInt();
+ for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
+ MapJoinKeyObject key = new MapJoinKeyObject();
+ key.read(keyContext, in, keyContainer);
+ if (container.get(key) == null) {
+ container.put(key, new MapJoinEagerRowContainer());
+ }
+ MapJoinEagerRowContainer values = (MapJoinEagerRowContainer) container.get(key);
+ values.read(valueContext, in, valueContainer);
+ container.put(key, values);
+ }
+ }
+
+ private void loadOptimized(MapJoinBytesTableContainer container, ObjectInputStream in,
+ Writable key, Writable value) throws Exception {
+ int numKeys = in.readInt();
+ for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
+ key.readFields(in);
+ long numRows = in.readLong();
+ for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
+ value.readFields(in);
+ container.putRow(keyContext, key, valueContext, value);
+ }
+ }
+ }
+
public void persist(ObjectOutputStream out, MapJoinPersistableTableContainer tableContainer)
throws HiveException {
int numKeys = tableContainer.size();
http://git-wip-us.apache.org/repos/asf/hive/blob/26eb94fc/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
index 1d674e9..10e3497 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -35,6 +36,8 @@ import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -43,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SparkBucketMapJoinContext;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.mapred.JobConf;
/**
@@ -93,10 +97,28 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
}
FileSystem fs = FileSystem.get(baseDir.toUri(), hconf);
BucketMapJoinContext mapJoinCtx = localWork.getBucketMapjoinContext();
+ boolean firstContainer = true;
+ boolean useOptimizedContainer = HiveConf.getBoolVar(
+ hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE);
for (int pos = 0; pos < mapJoinTables.length; pos++) {
if (pos == desc.getPosBigTable() || mapJoinTables[pos] != null) {
continue;
}
+ if (useOptimizedContainer) {
+ MapJoinObjectSerDeContext keyCtx = mapJoinTableSerdes[pos].getKeyContext();
+ ObjectInspector keyOI = keyCtx.getSerDe().getObjectInspector();
+ if (!MapJoinBytesTableContainer.isSupportedKey(keyOI)) {
+ if (firstContainer) {
+ LOG.warn("Not using optimized table container." +
+ "Only a subset of mapjoin keys is supported.");
+ useOptimizedContainer = false;
+ HiveConf.setBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE, false);
+ } else {
+ throw new HiveException("Only a subset of mapjoin keys is supported.");
+ }
+ }
+ }
+ firstContainer = false;
String bigInputPath = currentInputPath;
if (currentInputPath != null && mapJoinCtx != null) {
if (!desc.isBucketMapJoin()) {
@@ -124,14 +146,14 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
MapJoinTableContainerSerDe mapJoinTableSerde) throws HiveException {
LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path);
if (!SparkUtilities.isDedicatedCluster(hconf)) {
- return mapJoinTableSerde.load(fs, path);
+ return mapJoinTableSerde.load(fs, path, hconf);
}
MapJoinTableContainer mapJoinTable = SmallTableCache.get(path);
if (mapJoinTable == null) {
synchronized (path.toString().intern()) {
mapJoinTable = SmallTableCache.get(path);
if (mapJoinTable == null) {
- mapJoinTable = mapJoinTableSerde.load(fs, path);
+ mapJoinTable = mapJoinTableSerde.load(fs, path, hconf);
SmallTableCache.cache(path, mapJoinTable);
}
}