You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/05 01:49:08 UTC
svn commit: r1636789 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql:
HashTableLoaderFactory.java exec/persistence/MapJoinTableContainerSerDe.java
exec/spark/HashTableLoader.java
Author: xuefu
Date: Wed Nov 5 00:49:07 2014
New Revision: 1636789
URL: http://svn.apache.org/r1636789
Log:
HIVE-8623: Implement HashTableLoader for Spark map-join [Spark Branch] (Jimmy via Xuefu)
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java?rev=1636789&r1=1636788&r2=1636789&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java Wed Nov 5 00:49:07 2014
@@ -34,6 +34,8 @@ public class HashTableLoaderFactory {
public static HashTableLoader getLoader(Configuration hconf) {
if (HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
return new org.apache.hadoop.hive.ql.exec.tez.HashTableLoader();
+ } else if (HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ return new org.apache.hadoop.hive.ql.exec.spark.HashTableLoader();
} else {
return new org.apache.hadoop.hive.ql.exec.mr.HashTableLoader();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java?rev=1636789&r1=1636788&r2=1636789&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java Wed Nov 5 00:49:07 2014
@@ -19,12 +19,16 @@
package org.apache.hadoop.hive.ql.exec.persistence;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Constructor;
import java.util.ConcurrentModificationException;
import java.util.Map;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -86,6 +90,71 @@ public class MapJoinTableContainerSerDe
throw new HiveException("Error while trying to create table container", e);
}
}
+
+ /**
+ * Loads the table container from a folder. Only used on Spark path.
+ * @param fs FileSystem of the folder.
+ * @param folder The folder to load table container.
+ * @return Loaded table.
+ */
+ @SuppressWarnings("unchecked")
+ public MapJoinPersistableTableContainer load(
+ FileSystem fs, Path folder) throws HiveException {
+ try {
+ if (!fs.isDirectory(folder)) {
+ throw new HiveException("Error, not a directory: " + folder);
+ }
+ FileStatus[] fileStatuses = fs.listStatus(folder);
+ if (fileStatuses == null || fileStatuses.length == 0) {
+ return null;
+ }
+
+ SerDe keySerDe = keyContext.getSerDe();
+ SerDe valueSerDe = valueContext.getSerDe();
+ Writable keyContainer = keySerDe.getSerializedClass().newInstance();
+ Writable valueContainer = valueSerDe.getSerializedClass().newInstance();
+
+ MapJoinPersistableTableContainer tableContainer = null;
+
+ for (FileStatus fileStatus: fs.listStatus(folder)) {
+ Path filePath = fileStatus.getPath();
+ if (!fileStatus.isFile()) {
+ throw new HiveException("Error, not a file: " + filePath);
+ }
+ InputStream is = null;
+ ObjectInputStream in = null;
+ try {
+ is = fs.open(filePath, 4096);
+ in = new ObjectInputStream(is);
+ String name = in.readUTF();
+ Map<String, String> metaData = (Map<String, String>) in.readObject();
+ if (tableContainer == null) {
+ tableContainer = create(name, metaData);
+ }
+ int numKeys = in.readInt();
+ for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
+ MapJoinKeyObject key = new MapJoinKeyObject();
+ key.read(keyContext, in, keyContainer);
+ MapJoinEagerRowContainer values = new MapJoinEagerRowContainer();
+ values.read(valueContext, in, valueContainer);
+ tableContainer.put(key, values);
+ }
+ } finally {
+ if (in != null) {
+ in.close();
+ } else if (is != null) {
+ is.close();
+ }
+ }
+ }
+ return tableContainer;
+ } catch (IOException e) {
+ throw new HiveException("IO error while trying to create table container", e);
+ } catch(Exception e) {
+ throw new HiveException("Error while trying to create table container", e);
+ }
+ }
+
public void persist(ObjectOutputStream out, MapJoinPersistableTableContainer tableContainer)
throws HiveException {
int numKeys = tableContainer.size();
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java?rev=1636789&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java Wed Nov 5 00:49:07 2014
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * HashTableLoader for Spark to load the hashtable for MapJoins.
+ */
+public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTableLoader {
+
+ private static final Log LOG = LogFactory.getLog(HashTableLoader.class.getName());
+
+ private ExecMapperContext context;
+ private Configuration hconf;
+
+ private MapJoinOperator joinOp;
+ private MapJoinDesc desc;
+
+ @Override
+ public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
+ this.context = context;
+ this.hconf = hconf;
+ this.joinOp = joinOp;
+ this.desc = joinOp.getConf();
+ }
+
+ @Override
+ public void load(
+ MapJoinTableContainer[] mapJoinTables,
+ MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
+
+ String currentInputPath = context.getCurrentInputPath().toString();
+ LOG.info("******* Load from HashTable for input file: " + currentInputPath);
+ MapredLocalWork localWork = context.getLocalWork();
+ try {
+ if (localWork.getDirectFetchOp() != null) {
+ loadDirectly(mapJoinTables, currentInputPath);
+ }
+ // All HashTables share the same base dir,
+ // which is passed in as the tmp path
+ Path baseDir = localWork.getTmpPath();
+ if (baseDir == null) {
+ return;
+ }
+ FileSystem fs = FileSystem.get(baseDir.toUri(), hconf);
+ String fileName = localWork.getBucketFileName(currentInputPath);
+ for (int pos = 0; pos < mapJoinTables.length; pos++) {
+ if (pos == desc.getPosBigTable() || mapJoinTables[pos] != null) {
+ continue;
+ }
+ Path path = Utilities.generatePath(baseDir, desc.getDumpFilePrefix(), (byte)pos, fileName);
+ LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path);
+ mapJoinTables[pos] = mapJoinTableSerdes[pos].load(fs, path);
+ }
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void loadDirectly(MapJoinTableContainer[] mapJoinTables, String inputFileName)
+ throws Exception {
+ MapredLocalWork localWork = context.getLocalWork();
+ List<Operator<?>> directWorks = localWork.getDirectFetchOp().get(joinOp);
+ if (directWorks == null || directWorks.isEmpty()) {
+ return;
+ }
+ JobConf job = new JobConf(hconf);
+ MapredLocalTask localTask = new MapredLocalTask(localWork, job, false);
+
+ HashTableSinkOperator sink = new TemporaryHashSinkOperator(desc);
+ sink.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>(directWorks));
+
+ for (Operator<?> operator : directWorks) {
+ if (operator != null) {
+ operator.setChildOperators(Arrays.<Operator<? extends OperatorDesc>>asList(sink));
+ }
+ }
+ localTask.setExecContext(context);
+ localTask.startForward(inputFileName);
+
+ MapJoinTableContainer[] tables = sink.getMapJoinTables();
+ for (int i = 0; i < sink.getNumParent(); i++) {
+ if (sink.getParentOperators().get(i) != null) {
+ mapJoinTables[i] = tables[i];
+ }
+ }
+
+ Arrays.fill(tables, null);
+ }
+}