You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/14 00:04:33 UTC

svn commit: r1639547 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/ exec/spark/ optimizer/spark/ plan/

Author: xuefu
Date: Thu Nov 13 23:04:33 2014
New Revision: 1639547

URL: http://svn.apache.org/r1639547
Log:
HIVE-8810: Make HashTableSinkOperator works for Spark Branch [Spark Branch] (Jimmy via Xuefu)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1639547&r1=1639546&r2=1639547&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Thu Nov 13 23:04:33 2014
@@ -31,13 +31,12 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinPersistableTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
@@ -50,10 +49,11 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.util.ReflectionUtils;
 
-@SuppressWarnings({"rawtypes", "deprecation"})
 public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc> implements
     Serializable {
   private static final long serialVersionUID = 1L;
@@ -91,14 +91,10 @@ public class HashTableSinkOperator exten
   private transient List<ObjectInspector>[] joinFilterObjectInspectors;
 
   private transient Byte[] order; // order in which the results should
-  private Configuration hconf;
+  protected Configuration hconf;
 
-  // Used as a differentiator for different files
-  // in case multiple files are created for one operator.
-  private int fileIndex = 0;
-
-  private transient MapJoinPersistableTableContainer[] mapJoinTables;
-  private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
+  protected transient MapJoinPersistableTableContainer[] mapJoinTables;
+  protected transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
 
   private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
   private static final MapJoinEagerRowContainer EMPTY_ROW_CONTAINER = new MapJoinEagerRowContainer();
@@ -107,7 +103,7 @@ public class HashTableSinkOperator exten
   }
   
   private long rowNumber = 0;
-  private transient LogHelper console;
+  protected transient LogHelper console;
   private long hashTableScale;
   private MapJoinMemoryExhaustionHandler memoryExhaustionHandler;
   
@@ -201,24 +197,25 @@ public class HashTableSinkOperator exten
     return mapJoinTables;
   }
 
-//  private static List<ObjectInspector>[] getStandardObjectInspectors(
-//      List<ObjectInspector>[] aliasToObjectInspectors, int maxTag) {
-//    @SuppressWarnings("unchecked")
-//    List<ObjectInspector>[] result = new List[maxTag];
-//    for (byte alias = 0; alias < aliasToObjectInspectors.length; alias++) {
-//      List<ObjectInspector> oiList = aliasToObjectInspectors[alias];
-//      if (oiList == null) {
-//        continue;
-//      }
-//      ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(oiList.size());
-//      for (int i = 0; i < oiList.size(); i++) {
-//        fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList.get(i),
-//            ObjectInspectorCopyOption.WRITABLE));
-//      }
-//      result[alias] = fieldOIList;
-//    }
-//    return result;
-//  }
+  private static List<ObjectInspector>[] getStandardObjectInspectors(
+      List<ObjectInspector>[] aliasToObjectInspectors, int maxTag) {
+    @SuppressWarnings("unchecked")
+    List<ObjectInspector>[] result = new List[maxTag];
+    for (byte alias = 0; alias < aliasToObjectInspectors.length; alias++) {
+      List<ObjectInspector> oiList = aliasToObjectInspectors[alias];
+      if (oiList == null) {
+        continue;
+      }
+      ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(oiList.size());
+      for (int i = 0; i < oiList.size(); i++) {
+        fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList.get(i),
+            ObjectInspectorCopyOption.WRITABLE));
+      }
+      result[alias] = fieldOIList;
+    }
+    return result;
+
+  }
 
   /*
    * This operator only process small tables Read the key/value pairs Load them into hashtable
@@ -298,22 +295,11 @@ public class HashTableSinkOperator exten
       // get the tmp URI path; it will be a hdfs path if not local mode
       String dumpFilePrefix = conf.getDumpFilePrefix();
       Path path = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName);
-      FileSystem fs = path.getFileSystem(hconf);
-      short replication = fs.getDefaultReplication(path);
-
-      // For Spark, path is a folder. Let's create it now.
-      if (HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
-        fs.mkdirs(path);  // Create the folder and its parents if not there
-        path = new Path(path, getOperatorId() + "-" + (fileIndex++));
-        // TODO find out numOfPartitions for the big table
-        int numOfPartitions = 10;
-        replication = (short)Math.min(10, numOfPartitions);
-      }
       console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag +
           " with group count: " + tableContainer.size() + " into file: " + path);
       // get the hashtable file and path
-      ObjectOutputStream out = new ObjectOutputStream(
-        new BufferedOutputStream(fs.create(path, replication), 4096));
+      FileSystem fs = path.getFileSystem(hconf);
+      ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(fs.create(path), 4096));
       try {
         mapJoinTableSerdes[tag].persist(out, tableContainer);
       } finally {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1639547&r1=1639546&r2=1639547&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Thu Nov 13 23:04:33 2014
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
@@ -58,14 +62,11 @@ import org.apache.hadoop.hive.ql.plan.Re
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.ScriptDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.UDTFDesc;
 import org.apache.hadoop.hive.ql.plan.UnionDesc;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 /**
  * OperatorFactory.
  *
@@ -102,6 +103,8 @@ public final class OperatorFactory {
         HashTableDummyOperator.class));
     opvec.add(new OpTuple<HashTableSinkDesc>(HashTableSinkDesc.class,
         HashTableSinkOperator.class));
+    opvec.add(new OpTuple<SparkHashTableSinkDesc>(SparkHashTableSinkDesc.class,
+        SparkHashTableSinkOperator.class));
     opvec.add(new OpTuple<DummyStoreDesc>(DummyStoreDesc.class,
         DummyStoreOperator.class));
     opvec.add(new OpTuple<DemuxDesc>(DemuxDesc.class,

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java?rev=1639547&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java Thu Nov 13 23:04:33 2014
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.commons.io.FileExistsException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinPersistableTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+public class SparkHashTableSinkOperator
+    extends TerminalOperator<SparkHashTableSinkDesc> implements Serializable {
+  private static final long serialVersionUID = 1L;
+  protected static final Log LOG = LogFactory.getLog(SparkHashTableSinkOperator.class.getName());
+
+  private HashTableSinkOperator htsOperator;
+
+  // The position of this table
+  private byte tag;
+
+  public SparkHashTableSinkOperator() {
+    htsOperator = new HashTableSinkOperator();
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    ObjectInspector[] inputOIs = new ObjectInspector[conf.getTagLength()];
+    inputOIs[tag] = inputObjInspectors[0];
+    htsOperator.setConf(conf);
+    htsOperator.initialize(hconf, inputOIs);
+  }
+
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
+    // Ignore the tag passed in, which should be 0, not what we want
+    htsOperator.processOp(row, this.tag);
+  }
+
+  @Override
+  public void closeOp(boolean abort) throws HiveException {
+    try {
+      MapJoinPersistableTableContainer[] mapJoinTables = htsOperator.mapJoinTables;
+      if (mapJoinTables == null || mapJoinTables.length < tag
+          || mapJoinTables[tag] == null) {
+        LOG.debug("mapJoinTable is null");
+      } else {
+        flushToFile(mapJoinTables[tag], tag);
+      }
+      super.closeOp(abort);
+    } catch (HiveException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  protected void flushToFile(MapJoinPersistableTableContainer tableContainer,
+      byte tag) throws IOException, HiveException {
+    // get tmp file URI
+    Path tmpURI = getExecContext().getLocalWork().getTmpHDFSPath();
+    LOG.info("Temp URI for side table: " + tmpURI);
+    // get current input file name
+    String bigBucketFileName = getExecContext().getCurrentBigBucketFile();
+    String fileName = getExecContext().getLocalWork().getBucketFileName(bigBucketFileName);
+    // get the tmp URI path; it will be a hdfs path if not local mode
+    String dumpFilePrefix = conf.getDumpFilePrefix();
+    Path path = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName);
+    FileSystem fs = path.getFileSystem(htsOperator.getConfiguration());
+    short replication = fs.getDefaultReplication(path);
+
+    fs.mkdirs(path);  // Create the folder and its parents if not there
+    while (true) {
+      path = new Path(path, getOperatorId()
+        + "-" + Math.abs(Utilities.randGen.nextInt()));
+      try {
+        // This will guarantee file name uniqueness.
+        // TODO: can we use the task id, which should be unique
+        if (fs.createNewFile(path)) {
+          break;
+        }
+      } catch (FileExistsException e) {
+        // No problem, use a new name
+      }
+      // TODO find out numOfPartitions for the big table
+      int numOfPartitions = replication;
+      replication = (short)Math.min(10, numOfPartitions);
+    }
+    htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag +
+      " with group count: " + tableContainer.size() + " into file: " + path);
+    // get the hashtable file and path
+    // get the hashtable file and path
+    OutputStream os = null;
+    ObjectOutputStream out = null;
+    try {
+      os = fs.create(path, replication);
+      out = new ObjectOutputStream(new BufferedOutputStream(os, 4096));
+      MapJoinTableContainerSerDe mapJoinTableSerde = htsOperator.mapJoinTableSerdes[tag];
+      mapJoinTableSerde.persist(out, tableContainer);
+    } finally {
+      if (out != null) {
+        out.close();
+      } else if (os != null) {
+        os.close();
+      }
+    }
+    tableContainer.clear();
+    FileStatus status = fs.getFileStatus(path);
+    htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path +
+      " (" + status.getLen() + " bytes)");
+  }
+
+  public void setTag(byte tag) {
+    this.tag = tag;
+  }
+
+  /**
+   * Implements the getName function for the Node Interface.
+   *
+   * @return the name of the operator
+   */
+  @Override
+  public String getName() {
+    return HashTableSinkOperator.getOperatorName();
+  }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.HASHTABLESINK;
+  }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java?rev=1639547&r1=1639546&r2=1639547&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java Thu Nov 13 23:04:33 2014
@@ -76,7 +76,7 @@ public class HashTableLoader implements 
       }
       // All HashTables share the same base dir,
       // which is passed in as the tmp path
-      Path baseDir = localWork.getTmpPath();
+      Path baseDir = localWork.getTmpHDFSPath();
       if (baseDir == null) {
         return;
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java?rev=1639547&r1=1639546&r2=1639547&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java Thu Nov 13 23:04:33 2014
@@ -26,15 +26,13 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
-import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -43,12 +41,12 @@ import org.apache.hadoop.hive.ql.parse.s
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
-import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 
@@ -234,9 +232,9 @@ public class SparkReduceSinkMapJoinProc 
     //replace ReduceSinkOp with HashTableSinkOp for the RSops which are parents of MJop
     MapJoinDesc mjDesc = mapJoinOp.getConf();
 
-    HashTableSinkDesc hashTableSinkDesc = new HashTableSinkDesc(mjDesc);
-    HashTableSinkOperator hashTableSinkOp = (HashTableSinkOperator) OperatorFactory
-        .get(hashTableSinkDesc);
+    SparkHashTableSinkDesc hashTableSinkDesc = new SparkHashTableSinkDesc(mjDesc);
+    SparkHashTableSinkOperator hashTableSinkOp =
+      (SparkHashTableSinkOperator) OperatorFactory.get(hashTableSinkDesc);
 
     //get all parents of reduce sink
     List<Operator<? extends OperatorDesc>> RSparentOps = parentRS.getParentOperators();
@@ -244,6 +242,7 @@ public class SparkReduceSinkMapJoinProc 
       parent.replaceChild(parentRS, hashTableSinkOp);
     }
     hashTableSinkOp.setParentOperators(RSparentOps);
+    hashTableSinkOp.setTag((byte)pos);
     return true;
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java?rev=1639547&r1=1639546&r2=1639547&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java Thu Nov 13 23:04:33 2014
@@ -45,6 +45,8 @@ public class MapredLocalWork implements 
   private BucketMapJoinContext bucketMapjoinContext;
   private Path tmpPath;
   private String stageID;
+  // Temp HDFS path for Spark HashTable sink and loader
+  private Path tmpHDFSPath;
 
   private List<Operator<? extends OperatorDesc>> dummyParentOp;
   private Map<MapJoinOperator, List<Operator<? extends OperatorDesc>>> directFetchOp;
@@ -168,6 +170,14 @@ public class MapredLocalWork implements 
     return tmpPath;
   }
 
+  public void setTmpHDFSPath(Path tmpPath) {
+    this.tmpHDFSPath = tmpPath;
+  }
+
+  public Path getTmpHDFSPath() {
+    return tmpHDFSPath;
+  }
+
   public String getBucketFileName(String bigFileName) {
     if (!inputFileChangeSensitive || bigFileName == null || bigFileName.isEmpty()) {
       return "-";

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java?rev=1639547&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java Thu Nov 13 23:04:33 2014
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+/**
+ * Map Join operator Descriptor implementation.
+ *
+ */
+@Explain(displayName = "Spark HashTable Sink Operator")
+public class SparkHashTableSinkDesc extends HashTableSinkDesc {
+  private static final long serialVersionUID = 1L;
+
+  public SparkHashTableSinkDesc() {
+  }
+
+  public SparkHashTableSinkDesc(MapJoinDesc clone) {
+    super(clone);
+  }
+}