You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/08/02 05:49:16 UTC

[3/4] hive git commit: HIVE-11182: Enable optimized hash tables for spark [Spark Branch] (Rui reviewed by Xuefu)

HIVE-11182: Enable optimized hash tables for spark [Spark Branch] (Rui reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/26eb94fc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/26eb94fc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/26eb94fc

Branch: refs/heads/branch-1
Commit: 26eb94fcb7763fe437b6033776ec8593dfc4a69f
Parents: b4aae73
Author: Rui Li <ru...@intel.com>
Authored: Thu Jul 9 09:58:15 2015 +0800
Committer: xzhang <xz...@xzdt>
Committed: Sat Aug 1 20:35:02 2015 -0700

----------------------------------------------------------------------
 .../hive/ql/exec/HashTableSinkOperator.java     |  6 +-
 .../persistence/MapJoinTableContainerSerDe.java | 63 +++++++++++++++-----
 .../hive/ql/exec/spark/HashTableLoader.java     | 26 +++++++-
 3 files changed, 76 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/26eb94fc/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
index 96283cd..63d4989 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
@@ -269,9 +269,9 @@ public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc> i
   public void closeOp(boolean abort) throws HiveException {
     try {
       if (mapJoinTables == null) {
-	if (isLogDebugEnabled) {
-	  LOG.debug("mapJoinTables is null");
-	}
+        if (isLogDebugEnabled) {
+          LOG.debug("mapJoinTables is null");
+        }
       } else {
         flushToFile();
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/26eb94fc/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
index 92625f2..e97a9f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
@@ -26,10 +26,12 @@ import java.lang.reflect.Constructor;
 import java.util.ConcurrentModificationException;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -97,11 +99,12 @@ public class MapJoinTableContainerSerDe {
    * Loads the table container from a folder. Only used on Spark path.
    * @param fs FileSystem of the folder.
    * @param folder The folder to load table container.
+   * @param hconf The hive configuration
    * @return Loaded table.
    */
   @SuppressWarnings("unchecked")
-  public MapJoinPersistableTableContainer load(
-      FileSystem fs, Path folder) throws HiveException {
+  public MapJoinTableContainer load(
+      FileSystem fs, Path folder, Configuration hconf) throws HiveException {
     try {
       if (!fs.isDirectory(folder)) {
         throw new HiveException("Error, not a directory: " + folder);
@@ -116,7 +119,10 @@ public class MapJoinTableContainerSerDe {
       Writable keyContainer = keySerDe.getSerializedClass().newInstance();
       Writable valueContainer = valueSerDe.getSerializedClass().newInstance();
 
-      MapJoinPersistableTableContainer tableContainer = null;
+      MapJoinTableContainer tableContainer = null;
+
+      boolean useOptimizedContainer = HiveConf.getBoolVar(
+          hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE);
 
       for (FileStatus fileStatus: fileStatuses) {
         Path filePath = fileStatus.getPath();
@@ -131,18 +137,16 @@ public class MapJoinTableContainerSerDe {
           String name = in.readUTF();
           Map<String, String> metaData = (Map<String, String>) in.readObject();
           if (tableContainer == null) {
-            tableContainer = create(name, metaData);
+            tableContainer = useOptimizedContainer ?
+                new MapJoinBytesTableContainer(hconf, valueContext, -1, 0) :
+                create(name, metaData);
           }
-          int numKeys = in.readInt();
-          for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
-            MapJoinKeyObject key = new MapJoinKeyObject();
-            key.read(keyContext, in, keyContainer);
-            if (tableContainer.get(key) == null) {
-              tableContainer.put(key, new MapJoinEagerRowContainer());
-            }
-            MapJoinEagerRowContainer values = (MapJoinEagerRowContainer) tableContainer.get(key);
-            values.read(valueContext, in, valueContainer);
-            tableContainer.put(key, values);
+          if (useOptimizedContainer) {
+            loadOptimized((MapJoinBytesTableContainer) tableContainer,
+                in, keyContainer, valueContainer);
+          } else {
+            loadNormal((MapJoinPersistableTableContainer) tableContainer,
+                in, keyContainer, valueContainer);
           }
         } finally {
           if (in != null) {
@@ -152,6 +156,9 @@ public class MapJoinTableContainerSerDe {
           }
         }
       }
+      if (tableContainer != null) {
+        tableContainer.seal();
+      }
       return tableContainer;
     } catch (IOException e) {
       throw new HiveException("IO error while trying to create table container", e);
@@ -160,6 +167,34 @@ public class MapJoinTableContainerSerDe {
     }
   }
 
+  private void loadNormal(MapJoinPersistableTableContainer container,
+      ObjectInputStream in, Writable keyContainer, Writable valueContainer) throws Exception {
+    int numKeys = in.readInt();
+    for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
+      MapJoinKeyObject key = new MapJoinKeyObject();
+      key.read(keyContext, in, keyContainer);
+      if (container.get(key) == null) {
+        container.put(key, new MapJoinEagerRowContainer());
+      }
+      MapJoinEagerRowContainer values = (MapJoinEagerRowContainer) container.get(key);
+      values.read(valueContext, in, valueContainer);
+      container.put(key, values);
+    }
+  }
+
+  private void loadOptimized(MapJoinBytesTableContainer container, ObjectInputStream in,
+      Writable key, Writable value) throws Exception {
+    int numKeys = in.readInt();
+    for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
+      key.readFields(in);
+      long numRows = in.readLong();
+      for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
+        value.readFields(in);
+        container.putRow(keyContext, key, valueContext, value);
+      }
+    }
+  }
+
   public void persist(ObjectOutputStream out, MapJoinPersistableTableContainer tableContainer)
       throws HiveException {
     int numKeys = tableContainer.size();

http://git-wip-us.apache.org/repos/asf/hive/blob/26eb94fc/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
index 1d674e9..10e3497 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -35,6 +36,8 @@ import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -43,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SparkBucketMapJoinContext;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
 
 /**
@@ -93,10 +97,28 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
       }
       FileSystem fs = FileSystem.get(baseDir.toUri(), hconf);
       BucketMapJoinContext mapJoinCtx = localWork.getBucketMapjoinContext();
+      boolean firstContainer = true;
+      boolean useOptimizedContainer = HiveConf.getBoolVar(
+          hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE);
       for (int pos = 0; pos < mapJoinTables.length; pos++) {
         if (pos == desc.getPosBigTable() || mapJoinTables[pos] != null) {
           continue;
         }
+        if (useOptimizedContainer) {
+          MapJoinObjectSerDeContext keyCtx = mapJoinTableSerdes[pos].getKeyContext();
+          ObjectInspector keyOI = keyCtx.getSerDe().getObjectInspector();
+          if (!MapJoinBytesTableContainer.isSupportedKey(keyOI)) {
+            if (firstContainer) {
+              LOG.warn("Not using optimized table container." +
+                  "Only a subset of mapjoin keys is supported.");
+              useOptimizedContainer = false;
+              HiveConf.setBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE, false);
+            } else {
+              throw new HiveException("Only a subset of mapjoin keys is supported.");
+            }
+          }
+        }
+        firstContainer = false;
         String bigInputPath = currentInputPath;
         if (currentInputPath != null && mapJoinCtx != null) {
           if (!desc.isBucketMapJoin()) {
@@ -124,14 +146,14 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
       MapJoinTableContainerSerDe mapJoinTableSerde) throws HiveException {
     LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path);
     if (!SparkUtilities.isDedicatedCluster(hconf)) {
-      return mapJoinTableSerde.load(fs, path);
+      return mapJoinTableSerde.load(fs, path, hconf);
     }
     MapJoinTableContainer mapJoinTable = SmallTableCache.get(path);
     if (mapJoinTable == null) {
       synchronized (path.toString().intern()) {
         mapJoinTable = SmallTableCache.get(path);
         if (mapJoinTable == null) {
-          mapJoinTable = mapJoinTableSerde.load(fs, path);
+          mapJoinTable = mapJoinTableSerde.load(fs, path, hconf);
           SmallTableCache.cache(path, mapJoinTable);
         }
       }