You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/05 01:49:08 UTC

svn commit: r1636789 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: HashTableLoaderFactory.java exec/persistence/MapJoinTableContainerSerDe.java exec/spark/HashTableLoader.java

Author: xuefu
Date: Wed Nov  5 00:49:07 2014
New Revision: 1636789

URL: http://svn.apache.org/r1636789
Log:
HIVE-8623: Implement HashTableLoader for Spark map-join [Spark Branch] (Jimmy via Xuefu)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java?rev=1636789&r1=1636788&r2=1636789&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java Wed Nov  5 00:49:07 2014
@@ -34,6 +34,8 @@ public class HashTableLoaderFactory {
   public static HashTableLoader getLoader(Configuration hconf) {
     if (HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
       return new org.apache.hadoop.hive.ql.exec.tez.HashTableLoader();
+    } else if (HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+      return new org.apache.hadoop.hive.ql.exec.spark.HashTableLoader();
     } else {
       return new org.apache.hadoop.hive.ql.exec.mr.HashTableLoader();
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java?rev=1636789&r1=1636788&r2=1636789&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java Wed Nov  5 00:49:07 2014
@@ -19,12 +19,16 @@
 package org.apache.hadoop.hive.ql.exec.persistence;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Constructor;
 import java.util.ConcurrentModificationException;
 import java.util.Map;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -86,6 +90,71 @@ public class MapJoinTableContainerSerDe 
       throw new HiveException("Error while trying to create table container", e);
     }
   }
+
+  /**
+   * Loads the table container from a folder. Only used on Spark path.
+   * @param fs FileSystem of the folder.
+   * @param folder The folder to load table container.
+   * @return Loaded table.
+   */
+  @SuppressWarnings("unchecked")
+  public MapJoinPersistableTableContainer load(
+      FileSystem fs, Path folder) throws HiveException {
+    try {
+      if (!fs.isDirectory(folder)) {
+        throw new HiveException("Error, not a directory: " + folder);
+      }
+      FileStatus[] fileStatuses = fs.listStatus(folder);
+      if (fileStatuses == null || fileStatuses.length == 0) {
+        return null;
+      }
+
+      SerDe keySerDe = keyContext.getSerDe();
+      SerDe valueSerDe = valueContext.getSerDe();
+      Writable keyContainer = keySerDe.getSerializedClass().newInstance();
+      Writable valueContainer = valueSerDe.getSerializedClass().newInstance();
+
+      MapJoinPersistableTableContainer tableContainer = null;
+
+      for (FileStatus fileStatus: fs.listStatus(folder)) {
+        Path filePath = fileStatus.getPath();
+        if (!fileStatus.isFile()) {
+          throw new HiveException("Error, not a file: " + filePath);
+        }
+        InputStream is = null;
+        ObjectInputStream in = null;
+        try {
+          is = fs.open(filePath, 4096);
+          in = new ObjectInputStream(is);
+          String name = in.readUTF();
+          Map<String, String> metaData = (Map<String, String>) in.readObject();
+          if (tableContainer == null) {
+            tableContainer = create(name, metaData);
+          }
+          int numKeys = in.readInt();
+          for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
+            MapJoinKeyObject key = new MapJoinKeyObject();
+            key.read(keyContext, in, keyContainer);
+            MapJoinEagerRowContainer values = new MapJoinEagerRowContainer();
+            values.read(valueContext, in, valueContainer);
+            tableContainer.put(key, values);
+          }
+        } finally {
+          if (in != null) {
+            in.close();
+          } else if (is != null) {
+            is.close();
+          }
+        }
+      }
+      return tableContainer;
+    } catch (IOException e) {
+      throw new HiveException("IO error while trying to create table container", e);
+    } catch(Exception e) {
+      throw new HiveException("Error while trying to create table container", e);
+    }
+  }
+
   public void persist(ObjectOutputStream out, MapJoinPersistableTableContainer tableContainer)
       throws HiveException {
     int numKeys = tableContainer.size();

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java?rev=1636789&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java Wed Nov  5 00:49:07 2014
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * HashTableLoader for Spark to load the hashtable for MapJoins.
+ */
+public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTableLoader {
+
+  private static final Log LOG = LogFactory.getLog(HashTableLoader.class.getName());
+
+  private ExecMapperContext context;
+  private Configuration hconf;
+
+  private MapJoinOperator joinOp;
+  private MapJoinDesc desc;
+
+  @Override
+  public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
+    this.context = context;
+    this.hconf = hconf;
+    this.joinOp = joinOp;
+    this.desc = joinOp.getConf();
+  }
+
+  @Override
+  public void load(
+      MapJoinTableContainer[] mapJoinTables,
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
+
+    String currentInputPath = context.getCurrentInputPath().toString();
+    LOG.info("******* Load from HashTable for input file: " + currentInputPath);
+    MapredLocalWork localWork = context.getLocalWork();
+    try {
+      if (localWork.getDirectFetchOp() != null) {
+        loadDirectly(mapJoinTables, currentInputPath);
+      }
+      // All HashTables share the same base dir,
+      // which is passed in as the tmp path
+      Path baseDir = localWork.getTmpPath();
+      if (baseDir == null) {
+        return;
+      }
+      FileSystem fs = FileSystem.get(baseDir.toUri(), hconf);
+      String fileName = localWork.getBucketFileName(currentInputPath);
+      for (int pos = 0; pos < mapJoinTables.length; pos++) {
+        if (pos == desc.getPosBigTable() || mapJoinTables[pos] != null) {
+          continue;
+        }
+        Path path = Utilities.generatePath(baseDir, desc.getDumpFilePrefix(), (byte)pos, fileName);
+        LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path);
+        mapJoinTables[pos] = mapJoinTableSerdes[pos].load(fs, path);
+      }
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void loadDirectly(MapJoinTableContainer[] mapJoinTables, String inputFileName)
+      throws Exception {
+    MapredLocalWork localWork = context.getLocalWork();
+    List<Operator<?>> directWorks = localWork.getDirectFetchOp().get(joinOp);
+    if (directWorks == null || directWorks.isEmpty()) {
+      return;
+    }
+    JobConf job = new JobConf(hconf);
+    MapredLocalTask localTask = new MapredLocalTask(localWork, job, false);
+
+    HashTableSinkOperator sink = new TemporaryHashSinkOperator(desc);
+    sink.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>(directWorks));
+
+    for (Operator<?> operator : directWorks) {
+      if (operator != null) {
+        operator.setChildOperators(Arrays.<Operator<? extends OperatorDesc>>asList(sink));
+      }
+    }
+    localTask.setExecContext(context);
+    localTask.startForward(inputFileName);
+
+    MapJoinTableContainer[] tables = sink.getMapJoinTables();
+    for (int i = 0; i < sink.getNumParent(); i++) {
+      if (sink.getParentOperators().get(i) != null) {
+        mapJoinTables[i] = tables[i];
+      }
+    }
+
+    Arrays.fill(tables, null);
+  }
+}