You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/11/18 23:41:15 UTC

[33/34] hive git commit: HIVE-12434: Merge branch 'spark' to master

HIVE-12434: Merge branch 'spark' to master


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0a88760f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0a88760f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0a88760f

Branch: refs/heads/master-fixed
Commit: 0a88760f5348f8c8cd9a335f705326ed13df7d8d
Parents: 2c19b7d
Author: Xuefu Zhang <xz...@Cloudera.com>
Authored: Wed Nov 18 13:38:52 2015 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Wed Nov 18 14:40:11 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    4 +-
 .../src/test/templates/TestHBaseCliDriver.vm    |   63 +-
 .../templates/TestHBaseNegativeCliDriver.vm     |   64 +-
 .../test/resources/testconfiguration.properties |   20 +-
 .../hadoop/hive/hbase/HBaseTestSetup.java       |    9 +-
 pom.xml                                         |   12 +-
 .../hadoop/hive/ql/exec/ScriptOperator.java     |   15 +
 .../persistence/MapJoinTableContainerSerDe.java |   70 +
 .../hive/ql/exec/spark/HashTableLoader.java     |   18 +-
 .../ql/exec/spark/RemoteHiveSparkClient.java    |    4 +-
 .../hive/ql/exec/spark/SparkPlanGenerator.java  |   17 +
 .../hive/ql/exec/spark/SparkUtilities.java      |   10 +-
 .../spark/status/impl/JobMetricsListener.java   |   89 +-
 .../mapjoin/VectorMapJoinCommonOperator.java    |    4 +-
 .../fast/VectorMapJoinFastTableContainer.java   |    2 +-
 .../hive/ql/optimizer/physical/Vectorizer.java  |    4 +-
 .../optimizer/spark/SparkMapJoinOptimizer.java  |   10 +
 .../ql/parse/spark/GenSparkProcContext.java     |    2 -
 .../hive/ql/parse/spark/GenSparkWork.java       |    7 -
 ql/src/test/queries/clientpositive/orc_merge1.q |    2 +
 ql/src/test/queries/clientpositive/orc_merge2.q |    1 +
 ql/src/test/queries/clientpositive/orc_merge3.q |    1 +
 ql/src/test/queries/clientpositive/orc_merge4.q |    2 +
 ql/src/test/queries/clientpositive/orc_merge5.q |    3 +
 ql/src/test/queries/clientpositive/orc_merge6.q |    3 +
 ql/src/test/queries/clientpositive/orc_merge7.q |    3 +
 ql/src/test/queries/clientpositive/orc_merge8.q |    2 +
 ql/src/test/queries/clientpositive/orc_merge9.q |    1 +
 .../clientpositive/orc_merge_incompat1.q        |    1 +
 .../clientpositive/orc_merge_incompat2.q        |    1 +
 .../clientpositive/spark/orc_merge1.q.out       |  485 ++++++
 .../clientpositive/spark/orc_merge2.q.out       |  268 ++++
 .../clientpositive/spark/orc_merge3.q.out       |  207 +++
 .../clientpositive/spark/orc_merge4.q.out       |  231 +++
 .../clientpositive/spark/orc_merge5.q.out       |  334 +++++
 .../clientpositive/spark/orc_merge6.q.out       |  508 +++++++
 .../clientpositive/spark/orc_merge7.q.out       |  619 ++++++++
 .../clientpositive/spark/orc_merge8.q.out       |  130 ++
 .../clientpositive/spark/orc_merge9.q.out       |  186 +++
 .../spark/orc_merge_incompat1.q.out             |  240 +++
 .../spark/orc_merge_incompat2.q.out             |  370 +++++
 .../spark/vector_inner_join.q.out               |  853 +++++++++++
 .../spark/vector_outer_join0.q.out              |  242 +++
 .../spark/vector_outer_join1.q.out              |  631 ++++++++
 .../spark/vector_outer_join2.q.out              |  323 ++++
 .../spark/vector_outer_join3.q.out              |  630 ++++++++
 .../spark/vector_outer_join4.q.out              | 1000 +++++++++++++
 .../spark/vector_outer_join5.q.out              | 1406 ++++++++++++++++++
 ql/src/test/templates/TestCliDriver.vm          |   74 +-
 ql/src/test/templates/TestCompareCliDriver.vm   |   71 +-
 ql/src/test/templates/TestNegativeCliDriver.vm  |   70 +-
 ql/src/test/templates/TestParseNegative.vm      |   65 +-
 .../apache/hive/spark/client/JobContext.java    |    4 +-
 .../hive/spark/client/JobContextImpl.java       |    8 +-
 .../apache/hive/spark/client/RemoteDriver.java  |   60 +-
 .../hive/spark/client/SparkClientImpl.java      |    2 +-
 .../hive/spark/client/SparkClientUtilities.java |   25 +-
 .../ptest2/src/main/resources/batch-exec.vm     |    2 -
 58 files changed, 9000 insertions(+), 488 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7cab9ae..2185f85 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -827,8 +827,8 @@ public class HiveConf extends Configuration {
     HIVEMAPJOINBUCKETCACHESIZE("hive.mapjoin.bucket.cache.size", 100, ""),
 
     HIVEMAPJOINUSEOPTIMIZEDTABLE("hive.mapjoin.optimized.hashtable", true,
-        "Whether Hive should use memory-optimized hash table for MapJoin. Only works on Tez,\n" +
-        "because memory-optimized hashtable cannot be serialized."),
+        "Whether Hive should use memory-optimized hash table for MapJoin.\n" +
+        "Only works on Tez and Spark, because memory-optimized hashtable cannot be serialized."),
     HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT("hive.mapjoin.optimized.hashtable.probe.percent",
         (float) 0.5, "Probing space percentage of the optimized hashtable"),
     HIVEUSEHYBRIDGRACEHASHJOIN("hive.mapjoin.hybridgrace.hashtable", true, "Whether to use hybrid" +

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
----------------------------------------------------------------------
diff --git a/hbase-handler/src/test/templates/TestHBaseCliDriver.vm b/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
index de0be32..6f4a7c1 100644
--- a/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
+++ b/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
@@ -17,38 +17,25 @@
  */
 package org.apache.hadoop.hive.cli;
 
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-import java.io.*;
-import java.util.*;
-
-import org.apache.hadoop.hive.ql.QTestUtil;
 import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType;
 import org.apache.hadoop.hive.hbase.HBaseQTestUtil;
 import org.apache.hadoop.hive.hbase.HBaseTestSetup;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
 
-public class $className extends TestCase {
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class $className {
 
   private static final String HIVE_ROOT = HBaseQTestUtil.ensurePathEndsInSlash(System.getProperty("hive.root"));
   private HBaseQTestUtil qt;
-  private HBaseTestSetup setup;
+  private static HBaseTestSetup setup = new HBaseTestSetup();
 
-  public static class TestHBaseCliDriverAddTestFromQFiles implements QTestUtil.SuiteAddTestFunctor {
-    public void addTestToSuite(TestSuite suite, Object setup, String tName) {
-      suite.addTest(new $className("testCliDriver_"+tName, (HBaseTestSetup)setup));
-    }
-  }
-
-  public $className(String name, HBaseTestSetup setup) {
-    super(name);
-    qt = null;
-    this.setup = setup;
-  }
-
-  @Override
-  protected void setUp() {
+  @Before
+  public void setUp() {
 
     MiniClusterType miniMR = MiniClusterType.valueForString("$clusterMode");
     String initScript = "$initScript";
@@ -65,12 +52,11 @@ public class $className extends TestCase {
     }
   }
 
-  @Override
-  protected void tearDown() {
+  @After
+  public void tearDown() {
     try {
       qt.shutdown();
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       System.err.println("Exception: " + e.getMessage());
       e.printStackTrace();
       System.err.flush();
@@ -78,23 +64,9 @@ public class $className extends TestCase {
     }
   }
 
-  public static Test suite() {
-    Set<String> qFilesToExecute = new HashSet<String>();
-    String qFiles = System.getProperty("qfile", "").trim();
-    if(!qFiles.isEmpty()) {
-      for(String qFile : qFiles.split(",")) {
-        qFile = qFile.trim();
-        if(!qFile.isEmpty()) {
-          qFilesToExecute.add(qFile);
-        }
-      }
-    }
-    TestSuite suite = new TestSuite();
-    HBaseTestSetup setup = new HBaseTestSetup(suite);
-
-    QTestUtil.addTestsToSuiteFromQfileNames("$qFileNamesFile", qFilesToExecute,
-      suite, setup, new TestHBaseCliDriverAddTestFromQFiles());
-    return setup;
+  @AfterClass
+  public static void closeHBaseConnections() throws Exception {
+    setup.tearDown();
   }
 
 #foreach ($qf in $qfiles)
@@ -102,6 +74,7 @@ public class $className extends TestCase {
   #set ($eidx = $fname.indexOf('.'))
   #set ($tname = $fname.substring(0, $eidx))
   #set ($fpath = $qfilesMap.get($fname))
+  @Test
   public void testCliDriver_$tname() throws Exception {
     runTest("$tname", "$fname", (HIVE_ROOT + "$fpath"));
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm
----------------------------------------------------------------------
diff --git a/hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm b/hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm
index b402585..043bd87 100644
--- a/hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm
+++ b/hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm
@@ -18,38 +18,25 @@
 
 package org.apache.hadoop.hive.cli;
 
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-import java.io.*;
-import java.util.*;
-
-import org.apache.hadoop.hive.ql.QTestUtil;
 import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType;
 import org.apache.hadoop.hive.hbase.HBaseQTestUtil;
 import org.apache.hadoop.hive.hbase.HBaseTestSetup;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
 
-public class $className extends TestCase {
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class $className {
 
   private static final String HIVE_ROOT = HBaseQTestUtil.ensurePathEndsInSlash(System.getProperty("hive.root"));
   private HBaseQTestUtil qt;
-  private HBaseTestSetup setup;
-
-  public static class TestHBaseNegativeCliDriverAddTestFromQFiles implements QTestUtil.SuiteAddTestFunctor {
-    public void addTestToSuite(TestSuite suite, Object setup, String tName) {
-      suite.addTest(new $className("testCliDriver_"+tName, (HBaseTestSetup)setup));
-    }
-  }
-
-  public $className(String name, HBaseTestSetup setup) {
-    super(name);
-    qt = null;
-    this.setup = setup;
-  }
+  private static HBaseTestSetup setup = new HBaseTestSetup();
 
-  @Override
-  protected void setUp() {
+  @Before
+  public void setUp() {
 
     MiniClusterType miniMR = MiniClusterType.valueForString("$clusterMode");
     String initScript = "$initScript";
@@ -66,12 +53,11 @@ public class $className extends TestCase {
     }
   }
 
-  @Override
-  protected void tearDown() {
+  @After
+  public void tearDown() {
     try {
       qt.shutdown();
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       System.err.println("Exception: " + e.getMessage());
       e.printStackTrace();
       System.err.flush();
@@ -79,24 +65,9 @@ public class $className extends TestCase {
     }
   }
 
-  public static Test suite() {
-    Set<String> qFilesToExecute = new HashSet<String>();
-    String qFiles = System.getProperty("qfile", "").trim();
-    if(!qFiles.isEmpty()) {
-      for(String qFile : qFiles.split(",")) {
-        qFile = qFile.trim();
-        if(!qFile.isEmpty()) {
-          qFilesToExecute.add(qFile);
-        }
-      }
-    }
-
-    TestSuite suite = new TestSuite();
-    HBaseTestSetup setup = new HBaseTestSetup(suite);
-
-    QTestUtil.addTestsToSuiteFromQfileNames("$qFileNamesFile", qFilesToExecute,
-      suite, setup, new TestHBaseNegativeCliDriverAddTestFromQFiles());
-    return setup;
+  @AfterClass
+  public static void closeHBaseConnections() throws Exception {
+    setup.tearDown();
   }
 
 #foreach ($qf in $qfiles)
@@ -104,6 +75,7 @@ public class $className extends TestCase {
   #set ($eidx = $fname.indexOf('.'))
   #set ($tname = $fname.substring(0, $eidx))
   #set ($fpath = $qfilesMap.get($fname))
+  @Test
   public void testCliDriver_$tname() throws Exception {
     runTest("$tname", "$fname", (HIVE_ROOT + "$fpath"));
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index ece43cc..a49a301 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1257,6 +1257,17 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
   load_fs2.q,\
   load_hdfs_file_with_space_in_the_name.q,\
   optrstat_groupby.q,\
+  orc_merge1.q,\
+  orc_merge2.q,\
+  orc_merge3.q,\
+  orc_merge4.q,\
+  orc_merge5.q,\
+  orc_merge6.q,\
+  orc_merge7.q,\
+  orc_merge8.q,\
+  orc_merge9.q,\
+  orc_merge_incompat1.q,\
+  orc_merge_incompat2.q,\
   parallel_orderby.q,\
   quotedid_smb.q,\
   reduce_deduplicate.q,\
@@ -1270,7 +1281,14 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
   stats_counter_partitioned.q,\
   temp_table_external.q,\
   truncate_column_buckets.q,\
-  uber_reduce.q
+  uber_reduce.q,\
+  vector_inner_join.q,\
+  vector_outer_join0.q,\
+  vector_outer_join1.q,\
+  vector_outer_join2.q,\
+  vector_outer_join3.q,\
+  vector_outer_join4.q,\
+  vector_outer_join5.q
 
 # These tests are removed from miniSparkOnYarn.query.files
 #  ql_rewrite_gbtoidx.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java b/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
index 300f1cf..e6383dc 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
@@ -45,7 +45,7 @@ import org.apache.zookeeper.Watcher;
  * HBaseTestSetup defines HBase-specific test fixtures which are
  * reused across testcases.
  */
-public class HBaseTestSetup extends TestSetup {
+public class HBaseTestSetup {
 
   private MiniHBaseCluster hbaseCluster;
   private int zooKeeperPort;
@@ -54,10 +54,6 @@ public class HBaseTestSetup extends TestSetup {
 
   private static final int NUM_REGIONSERVERS = 1;
 
-  public HBaseTestSetup(Test test) {
-    super(test);
-  }
-
   public HConnection getConnection() {
     return this.hbaseConn;
   }
@@ -170,8 +166,7 @@ public class HBaseTestSetup extends TestSetup {
     return port;
   }
 
-  @Override
-  protected void tearDown() throws Exception {
+  public void tearDown() throws Exception {
     if (hbaseConn != null) {
       hbaseConn.close();
       hbaseConn = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4a90cef..c6df4a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -160,7 +160,7 @@
     <ST4.version>4.0.4</ST4.version>
     <tez.version>0.8.1-alpha</tez.version>
     <super-csv.version>2.2.0</super-csv.version>
-    <spark.version>1.4.0</spark.version>
+    <spark.version>1.5.0</spark.version>
     <scala.binary.version>2.10</scala.binary.version>
     <scala.version>2.10.4</scala.version>
     <tempus-fugit.version>1.1</tempus-fugit.version>
@@ -222,16 +222,6 @@
          <enabled>false</enabled>
        </snapshots>
     </repository>
-     <repository>
-       <id>spark-1.3</id>
-       <url>https://s3-us-west-1.amazonaws.com/hive-spark/maven2/spark_2.10-1.3-rc1/</url>
-       <releases>
-         <enabled>true</enabled>
-       </releases>
-       <snapshots>
-         <enabled>false</enabled>
-       </snapshots>
-    </repository>
   </repositories>
 
   <dependencyManagement>

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
index 5df9ea2..63837ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkEnv;
 import org.apache.spark.SparkFiles;
 
 import java.io.BufferedInputStream;
@@ -329,6 +331,7 @@ public class ScriptOperator extends Operator<ScriptDesc> implements
     // initialize the user's process only when you receive the first row
     if (firstRow) {
       firstRow = false;
+      SparkConf sparkConf = null;
       try {
         String[] cmdArgs = splitArgs(conf.getScriptCmd());
 
@@ -341,6 +344,7 @@ public class ScriptOperator extends Operator<ScriptDesc> implements
 
           // In spark local mode, we need to search added files in root directory.
           if (HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+            sparkConf = SparkEnv.get().conf();
             finder.prependPathComponent(SparkFiles.getRootDirectory());
           }
           File f = finder.getAbsolutePath(prog);
@@ -371,6 +375,17 @@ public class ScriptOperator extends Operator<ScriptDesc> implements
         String idEnvVarVal = getOperatorId();
         env.put(safeEnvVarName(idEnvVarName), idEnvVarVal);
 
+        // For spark, in non-local mode, any added dependencies are stored at
+        // SparkFiles::getRootDirectory, which is the executor's working directory.
+        // In local mode, we need to manually point the process's working directory to it,
+        // in order to make the dependencies accessible.
+        if (sparkConf != null) {
+          String master = sparkConf.get("spark.master");
+          if (master.equals("local") || master.startsWith("local[")) {
+            pb.directory(new File(SparkFiles.getRootDirectory()));
+          }
+        }
+
         scriptPid = pb.start(); // Runtime.getRuntime().exec(wrappedCmdArgs);
 
         DataOutputStream scriptOut = new DataOutputStream(

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
index e97a9f0..d6deabe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
@@ -32,7 +32,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -195,6 +197,74 @@ public class MapJoinTableContainerSerDe {
     }
   }
 
+  /**
+   * Loads the small table into a VectorMapJoinFastTableContainer. Only used on Spark path.
+   * @param mapJoinDesc The descriptor for the map join
+   * @param fs FileSystem of the folder.
+   * @param folder The folder to load table container.
+   * @param hconf The hive configuration
+   * @return Loaded table.
+   */
+  @SuppressWarnings("unchecked")
+  public MapJoinTableContainer loadFastContainer(MapJoinDesc mapJoinDesc,
+      FileSystem fs, Path folder, Configuration hconf) throws HiveException {
+    try {
+      if (!fs.isDirectory(folder)) {
+        throw new HiveException("Error, not a directory: " + folder);
+      }
+      FileStatus[] fileStatuses = fs.listStatus(folder);
+      if (fileStatuses == null || fileStatuses.length == 0) {
+        return null;
+      }
+
+      SerDe keySerDe = keyContext.getSerDe();
+      SerDe valueSerDe = valueContext.getSerDe();
+      Writable key = keySerDe.getSerializedClass().newInstance();
+      Writable value = valueSerDe.getSerializedClass().newInstance();
+
+      VectorMapJoinFastTableContainer tableContainer =
+          new VectorMapJoinFastTableContainer(mapJoinDesc, hconf, -1);
+
+      for (FileStatus fileStatus : fileStatuses) {
+        Path filePath = fileStatus.getPath();
+        if (ShimLoader.getHadoopShims().isDirectory(fileStatus)) {
+          throw new HiveException("Error, not a file: " + filePath);
+        }
+        InputStream is = null;
+        ObjectInputStream in = null;
+        try {
+          is = fs.open(filePath, 4096);
+          in = new ObjectInputStream(is);
+          // skip the name and metadata
+          in.readUTF();
+          in.readObject();
+          int numKeys = in.readInt();
+          for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
+            key.readFields(in);
+            long numRows = in.readLong();
+            for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
+              value.readFields(in);
+              tableContainer.putRow(null, key, null, value);
+            }
+          }
+        } finally {
+          if (in != null) {
+            in.close();
+          } else if (is != null) {
+            is.close();
+          }
+        }
+      }
+
+      tableContainer.seal();
+      return tableContainer;
+    } catch (IOException e) {
+      throw new HiveException("IO error while trying to create table container", e);
+    } catch (Exception e) {
+      throw new HiveException("Error while trying to create table container", e);
+    }
+  }
+
   public void persist(ObjectOutputStream out, MapJoinPersistableTableContainer tableContainer)
       throws HiveException {
     int numKeys = tableContainer.size();

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
index 39f9d40..7ada611 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SparkBucketMapJoinContext;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
 
@@ -62,6 +63,8 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
   private MapJoinOperator joinOp;
   private MapJoinDesc desc;
 
+  private boolean useFastContainer = false;
+
   @Override
   public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf,
       MapJoinOperator joinOp) {
@@ -69,6 +72,12 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
     this.hconf = hconf;
     this.joinOp = joinOp;
     this.desc = joinOp.getConf();
+    if (desc.getVectorMode() && HiveConf.getBoolVar(
+        hconf, HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) {
+      VectorMapJoinDesc vectorDesc = desc.getVectorDesc();
+      useFastContainer = vectorDesc != null && vectorDesc.hashTableImplementationType() ==
+          VectorMapJoinDesc.HashTableImplementationType.FAST;
+    }
   }
 
   @Override
@@ -98,7 +107,7 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
       FileSystem fs = FileSystem.get(baseDir.toUri(), hconf);
       BucketMapJoinContext mapJoinCtx = localWork.getBucketMapjoinContext();
       boolean firstContainer = true;
-      boolean useOptimizedContainer = HiveConf.getBoolVar(
+      boolean useOptimizedContainer = !useFastContainer && HiveConf.getBoolVar(
           hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE);
       for (int pos = 0; pos < mapJoinTables.length; pos++) {
         if (pos == desc.getPosBigTable() || mapJoinTables[pos] != null) {
@@ -146,14 +155,17 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
       MapJoinTableContainerSerDe mapJoinTableSerde) throws HiveException {
     LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path);
     if (!SparkUtilities.isDedicatedCluster(hconf)) {
-      return mapJoinTableSerde.load(fs, path, hconf);
+      return useFastContainer ? mapJoinTableSerde.loadFastContainer(desc, fs, path, hconf) :
+          mapJoinTableSerde.load(fs, path, hconf);
     }
     MapJoinTableContainer mapJoinTable = SmallTableCache.get(path);
     if (mapJoinTable == null) {
       synchronized (path.toString().intern()) {
         mapJoinTable = SmallTableCache.get(path);
         if (mapJoinTable == null) {
-          mapJoinTable = mapJoinTableSerde.load(fs, path, hconf);
+          mapJoinTable = useFastContainer ?
+              mapJoinTableSerde.loadFastContainer(desc, fs, path, hconf) :
+              mapJoinTableSerde.load(fs, path, hconf);
           SmallTableCache.cache(path, mapJoinTable);
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 86b9d67..c4cb2ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -295,11 +295,11 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
 
       // Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark
       // may need to load classes from this jar in other threads.
-      Set<String> addedJars = jc.getAddedJars();
+      Map<String, Long> addedJars = jc.getAddedJars();
       if (addedJars != null && !addedJars.isEmpty()) {
         SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir());
         KryoSerializer.setClassLoader(Thread.currentThread().getContextClassLoader());
-        localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";"));
+        localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";"));
       }
 
       Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index 6951993..6abef4e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -25,6 +26,8 @@ import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
@@ -228,6 +231,20 @@ public class SparkPlanGenerator {
     byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
     boolean caching = isCachingWork(work, sparkWork);
     if (work instanceof MapWork) {
+      // Create tmp dir for MergeFileWork
+      if (work instanceof MergeFileWork) {
+        Path outputPath = ((MergeFileWork) work).getOutputDir();
+        Path tempOutPath = Utilities.toTempPath(outputPath);
+        FileSystem fs = outputPath.getFileSystem(jobConf);
+        try {
+          if (!fs.exists(tempOutPath)) {
+            fs.mkdirs(tempOutPath);
+          }
+        } catch (IOException e) {
+          throw new RuntimeException(
+              "Can't make path " + outputPath + " : " + e.getMessage());
+        }
+      }
       MapTran mapTran = new MapTran(caching);
       HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter);
       mapTran.setMapFunction(mapFunc);

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index cf2c3bc..0268469 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -22,8 +22,6 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
-import java.util.UUID;
-import java.util.Collection;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.FilenameUtils;
@@ -91,11 +89,11 @@ public class SparkUtilities {
    */
   public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException {
     Path localFile = new Path(source.getPath());
-    // give the uploaded file a UUID
-    Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf),
-        UUID.randomUUID() + "-" + getFileName(source));
+    Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), getFileName(source));
     FileSystem fileSystem = FileSystem.get(conf);
-    fileSystem.copyFromLocalFile(localFile, remoteFile);
+    // Overwrite if the remote file already exists. Whether the file can be added
+    // on executor is up to spark, i.e. spark.files.overwrite
+    fileSystem.copyFromLocalFile(false, true, localFile, remoteFile);
     Path fullPath = fileSystem.getFileStatus(remoteFile).getPath();
     return fullPath.toUri();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
index 84603d5..09c54c1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
@@ -23,29 +23,16 @@ import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.spark.JavaSparkListener;
 import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.scheduler.SparkListener;
-import org.apache.spark.scheduler.SparkListenerApplicationEnd;
-import org.apache.spark.scheduler.SparkListenerApplicationStart;
-import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
-import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
-import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
-import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
-import org.apache.spark.scheduler.SparkListenerJobEnd;
 import org.apache.spark.scheduler.SparkListenerJobStart;
-import org.apache.spark.scheduler.SparkListenerStageCompleted;
-import org.apache.spark.scheduler.SparkListenerStageSubmitted;
 import org.apache.spark.scheduler.SparkListenerTaskEnd;
-import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
-import org.apache.spark.scheduler.SparkListenerTaskStart;
-import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
-import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
-import org.apache.spark.scheduler.SparkListenerExecutorAdded;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-public class JobMetricsListener implements SparkListener {
+public class JobMetricsListener extends JavaSparkListener {
 
   private static final Logger LOG = LoggerFactory.getLogger(JobMetricsListener.class);
 
@@ -54,36 +41,6 @@ public class JobMetricsListener implements SparkListener {
   private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap();
 
   @Override
-  public void onExecutorRemoved(SparkListenerExecutorRemoved removed) {
-
-  }
-
-  @Override
-  public void onExecutorAdded(SparkListenerExecutorAdded added) {
-
-  }
-
-  @Override
-  public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
-
-  }
-
-  @Override
-  public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
-
-  }
-
-  @Override
-  public void onTaskStart(SparkListenerTaskStart taskStart) {
-
-  }
-
-  @Override
-  public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
-
-  }
-
-  @Override
   public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
     int stageId = taskEnd.stageId();
     int stageAttemptId = taskEnd.stageAttemptId();
@@ -119,46 +76,6 @@ public class JobMetricsListener implements SparkListener {
     jobIdToStageId.put(jobId, intStageIds);
   }
 
-  @Override
-  public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
-
-  }
-
-  @Override
-  public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
-
-  }
-
-  @Override
-  public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
-
-  }
-
-  @Override
-  public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
-
-  }
-
-  @Override
-  public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
-
-  }
-
-  @Override
-  public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
-
-  }
-
-  @Override
-  public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
-
-  }
-
-  @Override
-  public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
-
-  }
-
   public synchronized  Map<String, List<TaskMetrics>> getJobMetric(int jobId) {
     return allJobMetrics.get(jobId);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index 6ecfaf7..2502ae2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -526,7 +526,9 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
       break;
     case FAST:
       // Use our specialized hash table loader.
-      hashTableLoader = new VectorMapJoinFastHashTableLoader();
+      hashTableLoader = HiveConf.getVar(
+          hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark") ?
+          HashTableLoaderFactory.getLoader(hconf) : new VectorMapJoinFastHashTableLoader();
       break;
     default:
       throw new RuntimeException("Unknown vector map join hash table implementation type " + hashTableImplementationType.name());

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index bcfc807..bd4a595 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -195,7 +195,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
 
   @Override
   public void clear() {
-    throw new RuntimeException("Not applicable");
+    // Do nothing
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index ce49b24..a842649 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -181,6 +181,7 @@ public class Vectorizer implements PhysicalPlanResolver {
   Set<String> supportedAggregationUdfs = new HashSet<String>();
 
   private HiveConf hiveConf;
+  private boolean isSpark;
 
   public Vectorizer() {
 
@@ -1163,6 +1164,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       LOG.info("Vectorization is disabled");
       return physicalContext;
     }
+    isSpark = (HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark"));
     // create dispatcher and graph walker
     Dispatcher disp = new VectorizationDispatcher(physicalContext);
     TaskGraphWalker ogw = new TaskGraphWalker(disp);
@@ -2120,7 +2122,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       case MAPJOIN:
         {
           MapJoinDesc desc = (MapJoinDesc) op.getConf();
-          boolean specialize = canSpecializeMapJoin(op, desc, isTez);
+          boolean specialize = canSpecializeMapJoin(op, desc, isTez || isSpark);
 
           if (!specialize) {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
index 4d2b8d6..005fad2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OpTraits;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
 
 /**
  * SparkMapJoinOptimizer cloned from ConvertJoinMapJoin is an optimization that replaces a common join
@@ -89,6 +91,14 @@ public class SparkMapJoinOptimizer implements NodeProcessor {
 
     LOG.info("Convert to non-bucketed map join");
     MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos);
+    // For native vectorized map join, we require the key SerDe to be BinarySortableSerDe
+    // Note: the MJ may not really get natively-vectorized later,
+    // but changing SerDe won't hurt correctness
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_ENABLED) &&
+        conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
+      mapJoinOp.getConf().getKeyTblDesc().getProperties().setProperty(
+          serdeConstants.SERIALIZATION_LIB, BinarySortableSerDe.class.getName());
+    }
     if (conf.getBoolVar(HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) {
       LOG.info("Check if it can be converted to bucketed map join");
       numBuckets = convertJoinBucketMapJoin(joinOp, mapJoinOp,

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
index 0a0c791..62237e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
@@ -129,7 +129,6 @@ public class GenSparkProcContext implements NodeProcessorCtx {
   public final Map<Operator<?>, BaseWork> unionWorkMap;
   public final List<UnionOperator> currentUnionOperators;
   public final Set<BaseWork> workWithUnionOperators;
-  public final Set<ReduceSinkOperator> clonedReduceSinks;
 
   public final Set<FileSinkOperator> fileSinkSet;
   public final Map<FileSinkOperator, List<FileSinkOperator>> fileSinkMap;
@@ -180,7 +179,6 @@ public class GenSparkProcContext implements NodeProcessorCtx {
     this.unionWorkMap = new LinkedHashMap<Operator<?>, BaseWork>();
     this.currentUnionOperators = new LinkedList<UnionOperator>();
     this.workWithUnionOperators = new LinkedHashSet<BaseWork>();
-    this.clonedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
     this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
     this.fileSinkMap = new LinkedHashMap<FileSinkOperator, List<FileSinkOperator>>();
     this.pruningSinkSet = new LinkedHashSet<Operator<?>>();

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
index 7a7b558..ea5e414 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
@@ -94,12 +94,6 @@ public class GenSparkWork implements NodeProcessor {
     LOG.debug("Root operator: " + root);
     LOG.debug("Leaf operator: " + operator);
 
-    if (context.clonedReduceSinks.contains(operator)) {
-      // if we're visiting a terminal we've created ourselves,
-      // just skip and keep going
-      return null;
-    }
-
     SparkWork sparkWork = context.currentTask.getWork();
     SMBMapJoinOperator smbOp = GenSparkUtils.getChildOperator(root, SMBMapJoinOperator.class);
 
@@ -192,7 +186,6 @@ public class GenSparkWork implements NodeProcessor {
                   // we've already set this one up. Need to clone for the next work.
                   r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
                       (ReduceSinkDesc)r.getConf().clone(), r.getParentOperators());
-                  context.clonedReduceSinks.add(r);
                 }
                 r.getConf().setOutputName(work.getName());
               }

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge1.q b/ql/src/test/queries/clientpositive/orc_merge1.q
index a8ac85b..afef1e5 100644
--- a/ql/src/test/queries/clientpositive/orc_merge1.q
+++ b/ql/src/test/queries/clientpositive/orc_merge1.q
@@ -10,6 +10,7 @@ set tez.grouping.max-size=2000;
 set hive.merge.tezfiles=false;
 set hive.merge.mapfiles=false;
 set hive.merge.mapredfiles=false;
+set hive.merge.sparkfiles=false;
 
 -- SORT_QUERY_RESULTS
 
@@ -39,6 +40,7 @@ dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/orcfile_merge1/ds=1/part=0/;
 set hive.merge.tezfiles=true;
 set hive.merge.mapfiles=true;
 set hive.merge.mapredfiles=true;
+set hive.merge.sparkfiles=true;
 -- auto-merge slow way
 EXPLAIN
     INSERT OVERWRITE TABLE orcfile_merge1b PARTITION (ds='1', part)

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge2.q b/ql/src/test/queries/clientpositive/orc_merge2.q
index 44ef280..6d229f1 100644
--- a/ql/src/test/queries/clientpositive/orc_merge2.q
+++ b/ql/src/test/queries/clientpositive/orc_merge2.q
@@ -2,6 +2,7 @@ set hive.explain.user=false;
 set hive.merge.orcfile.stripe.level=true;
 set hive.exec.dynamic.partition=true;
 set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.merge.sparkfiles=true;
 
 DROP TABLE orcfile_merge2a;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge3.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge3.q b/ql/src/test/queries/clientpositive/orc_merge3.q
index 9722e6d..f5600c6 100644
--- a/ql/src/test/queries/clientpositive/orc_merge3.q
+++ b/ql/src/test/queries/clientpositive/orc_merge3.q
@@ -1,5 +1,6 @@
 set hive.explain.user=false;
 set hive.merge.orcfile.stripe.level=true;
+set hive.merge.sparkfiles=true;
 
 DROP TABLE orcfile_merge3a;
 DROP TABLE orcfile_merge3b;

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge4.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge4.q b/ql/src/test/queries/clientpositive/orc_merge4.q
index 3b50465..536e717 100644
--- a/ql/src/test/queries/clientpositive/orc_merge4.q
+++ b/ql/src/test/queries/clientpositive/orc_merge4.q
@@ -9,12 +9,14 @@ CREATE TABLE orcfile_merge3a (key int, value string)
 CREATE TABLE orcfile_merge3b (key int, value string) STORED AS TEXTFILE;
 
 set hive.merge.mapfiles=false;
+set hive.merge.sparkfiles=false;
 INSERT OVERWRITE TABLE orcfile_merge3a PARTITION (ds='1')
     SELECT * FROM src;
 
 dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/orcfile_merge3a/ds=1/;
 
 set hive.merge.mapfiles=true;
+set hive.merge.sparkfiles=true;
 INSERT OVERWRITE TABLE orcfile_merge3a PARTITION (ds='1')
     SELECT * FROM src;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge5.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge5.q b/ql/src/test/queries/clientpositive/orc_merge5.q
index 3d32875..c24c407 100644
--- a/ql/src/test/queries/clientpositive/orc_merge5.q
+++ b/ql/src/test/queries/clientpositive/orc_merge5.q
@@ -17,6 +17,7 @@ set hive.merge.mapredfiles=false;
 set hive.compute.splits.in.am=true;
 set tez.grouping.min-size=1000;
 set tez.grouping.max-size=50000;
+set hive.merge.sparkfiles=false;
 
 -- 3 mappers
 explain insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
@@ -31,6 +32,7 @@ set hive.merge.orcfile.stripe.level=true;
 set hive.merge.tezfiles=true;
 set hive.merge.mapfiles=true;
 set hive.merge.mapredfiles=true;
+set hive.merge.sparkfiles=true;
 
 -- 3 mappers
 explain insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
@@ -45,6 +47,7 @@ set hive.merge.orcfile.stripe.level=false;
 set hive.merge.tezfiles=false;
 set hive.merge.mapfiles=false;
 set hive.merge.mapredfiles=false;
+set hive.merge.sparkfiles=false;
 
 insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
 analyze table orc_merge5b compute statistics noscan;

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge6.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge6.q b/ql/src/test/queries/clientpositive/orc_merge6.q
index 6bdaa9e..1612a8b 100644
--- a/ql/src/test/queries/clientpositive/orc_merge6.q
+++ b/ql/src/test/queries/clientpositive/orc_merge6.q
@@ -18,6 +18,7 @@ set hive.merge.mapredfiles=false;
 set hive.compute.splits.in.am=true;
 set tez.grouping.min-size=1000;
 set tez.grouping.max-size=50000;
+set hive.merge.sparkfiles=false;
 
 -- 3 mappers
 explain insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
@@ -36,6 +37,7 @@ set hive.merge.orcfile.stripe.level=true;
 set hive.merge.tezfiles=true;
 set hive.merge.mapfiles=true;
 set hive.merge.mapredfiles=true;
+set hive.merge.sparkfiles=true;
 
 -- 3 mappers
 explain insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
@@ -54,6 +56,7 @@ set hive.merge.orcfile.stripe.level=false;
 set hive.merge.tezfiles=false;
 set hive.merge.mapfiles=false;
 set hive.merge.mapredfiles=false;
+set hive.merge.sparkfiles=false;
 
 insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
 insert overwrite table orc_merge5a partition (year="2001",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge7.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge7.q b/ql/src/test/queries/clientpositive/orc_merge7.q
index 7a351c6..49b81bf 100644
--- a/ql/src/test/queries/clientpositive/orc_merge7.q
+++ b/ql/src/test/queries/clientpositive/orc_merge7.q
@@ -22,6 +22,7 @@ set tez.grouping.max-size=50000;
 set hive.exec.dynamic.partition=true;
 set hive.exec.dynamic.partition.mode=nonstrict;
 set hive.optimize.sort.dynamic.partition=false;
+set hive.merge.sparkfiles=false;
 
 -- 3 mappers
 explain insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
@@ -40,6 +41,7 @@ set hive.merge.orcfile.stripe.level=true;
 set hive.merge.tezfiles=true;
 set hive.merge.mapfiles=true;
 set hive.merge.mapredfiles=true;
+set hive.merge.sparkfiles=true;
 
 -- 3 mappers
 explain insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
@@ -58,6 +60,7 @@ set hive.merge.orcfile.stripe.level=false;
 set hive.merge.tezfiles=false;
 set hive.merge.mapfiles=false;
 set hive.merge.mapredfiles=false;
+set hive.merge.sparkfiles=false;
 
 insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
 insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge8.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge8.q b/ql/src/test/queries/clientpositive/orc_merge8.q
index 61ea4bf..30a892b 100644
--- a/ql/src/test/queries/clientpositive/orc_merge8.q
+++ b/ql/src/test/queries/clientpositive/orc_merge8.q
@@ -30,6 +30,7 @@ set hive.merge.orcfile.stripe.level=false;
 set hive.merge.tezfiles=false;
 set hive.merge.mapfiles=false;
 set hive.merge.mapredfiles=false;
+set hive.merge.sparkfiles=false;
 
 insert overwrite table alltypes_orc select * from alltypes;
 insert into table alltypes_orc select * from alltypes;
@@ -40,6 +41,7 @@ set hive.merge.orcfile.stripe.level=true;
 set hive.merge.tezfiles=true;
 set hive.merge.mapfiles=true;
 set hive.merge.mapredfiles=true;
+set hive.merge.sparkfiles=true;
 
 alter table alltypes_orc concatenate;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge9.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge9.q b/ql/src/test/queries/clientpositive/orc_merge9.q
index 010b5a1..5f387ba 100644
--- a/ql/src/test/queries/clientpositive/orc_merge9.q
+++ b/ql/src/test/queries/clientpositive/orc_merge9.q
@@ -15,6 +15,7 @@ set hive.merge.orcfile.stripe.level=true;
 set hive.merge.tezfiles=true;
 set hive.merge.mapfiles=true;
 set hive.merge.mapredfiles=true;
+set hive.merge.sparkfiles=true;
 
 select count(*) from ts_merge;
 alter table ts_merge concatenate;

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge_incompat1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge_incompat1.q b/ql/src/test/queries/clientpositive/orc_merge_incompat1.q
index dd58524..b9f6246 100644
--- a/ql/src/test/queries/clientpositive/orc_merge_incompat1.q
+++ b/ql/src/test/queries/clientpositive/orc_merge_incompat1.q
@@ -10,6 +10,7 @@ SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
 set hive.merge.orcfile.stripe.level=false;
 set hive.merge.mapfiles=false;
 set hive.merge.mapredfiles=false;
+set hive.merge.sparkfiles=false;
 
 -- 3 mappers
 explain insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge_incompat2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge_incompat2.q b/ql/src/test/queries/clientpositive/orc_merge_incompat2.q
index a8f8842..11d16c2 100644
--- a/ql/src/test/queries/clientpositive/orc_merge_incompat2.q
+++ b/ql/src/test/queries/clientpositive/orc_merge_incompat2.q
@@ -22,6 +22,7 @@ set tez.am.grouping.max-size=50000;
 set hive.exec.dynamic.partition=true;
 set hive.exec.dynamic.partition.mode=nonstrict;
 set hive.optimize.sort.dynamic.partition=false;
+set hive.merge.sparkfiles=false;
 
 explain insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
 set hive.exec.orc.default.row.index.stride=1000;

http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/results/clientpositive/spark/orc_merge1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/orc_merge1.q.out b/ql/src/test/results/clientpositive/spark/orc_merge1.q.out
new file mode 100644
index 0000000..86df0a7
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/orc_merge1.q.out
@@ -0,0 +1,485 @@
+PREHOOK: query: -- SORT_QUERY_RESULTS
+
+DROP TABLE orcfile_merge1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- SORT_QUERY_RESULTS
+
+DROP TABLE orcfile_merge1
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE orcfile_merge1b
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE orcfile_merge1b
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE orcfile_merge1c
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE orcfile_merge1c
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE orcfile_merge1 (key INT, value STRING) 
+    PARTITIONED BY (ds STRING, part STRING) STORED AS ORC
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@orcfile_merge1
+POSTHOOK: query: CREATE TABLE orcfile_merge1 (key INT, value STRING) 
+    PARTITIONED BY (ds STRING, part STRING) STORED AS ORC
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@orcfile_merge1
+PREHOOK: query: CREATE TABLE orcfile_merge1b (key INT, value STRING) 
+    PARTITIONED BY (ds STRING, part STRING) STORED AS ORC
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@orcfile_merge1b
+POSTHOOK: query: CREATE TABLE orcfile_merge1b (key INT, value STRING) 
+    PARTITIONED BY (ds STRING, part STRING) STORED AS ORC
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@orcfile_merge1b
+PREHOOK: query: CREATE TABLE orcfile_merge1c (key INT, value STRING) 
+    PARTITIONED BY (ds STRING, part STRING) STORED AS ORC
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@orcfile_merge1c
+POSTHOOK: query: CREATE TABLE orcfile_merge1c (key INT, value STRING) 
+    PARTITIONED BY (ds STRING, part STRING) STORED AS ORC
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@orcfile_merge1c
+PREHOOK: query: -- merge disabled
+EXPLAIN
+    INSERT OVERWRITE TABLE orcfile_merge1 PARTITION (ds='1', part)
+        SELECT key, value, PMOD(HASH(key), 2) as part
+        FROM src
+PREHOOK: type: QUERY
+POSTHOOK: query: -- merge disabled
+EXPLAIN
+    INSERT OVERWRITE TABLE orcfile_merge1 PARTITION (ds='1', part)
+        SELECT key, value, PMOD(HASH(key), 2) as part
+        FROM src
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+  Stage-2 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: UDFToInteger(key) (type: int), value (type: string), (hash(key) pmod 2) (type: int)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      table:
+                          input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                          serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                          name: default.orcfile_merge1
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          partition:
+            ds 1
+            part 
+          replace: true
+          table:
+              input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+              output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+              serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+              name: default.orcfile_merge1
+
+  Stage: Stage-2
+    Stats-Aggr Operator
+
+PREHOOK: query: INSERT OVERWRITE TABLE orcfile_merge1 PARTITION (ds='1', part)
+    SELECT key, value, PMOD(HASH(key), 2) as part
+    FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orcfile_merge1@ds=1
+POSTHOOK: query: INSERT OVERWRITE TABLE orcfile_merge1 PARTITION (ds='1', part)
+    SELECT key, value, PMOD(HASH(key), 2) as part
+    FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orcfile_merge1@ds=1/part=0
+POSTHOOK: Output: default@orcfile_merge1@ds=1/part=1
+POSTHOOK: Lineage: orcfile_merge1 PARTITION(ds=1,part=0).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1 PARTITION(ds=1,part=0).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1 PARTITION(ds=1,part=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1 PARTITION(ds=1,part=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+Found 2 items
+#### A masked pattern was here ####
+PREHOOK: query: -- auto-merge slow way
+EXPLAIN
+    INSERT OVERWRITE TABLE orcfile_merge1b PARTITION (ds='1', part)
+        SELECT key, value, PMOD(HASH(key), 2) as part
+        FROM src
+PREHOOK: type: QUERY
+POSTHOOK: query: -- auto-merge slow way
+EXPLAIN
+    INSERT OVERWRITE TABLE orcfile_merge1b PARTITION (ds='1', part)
+        SELECT key, value, PMOD(HASH(key), 2) as part
+        FROM src
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-7 depends on stages: Stage-1 , consists of Stage-4, Stage-3, Stage-5
+  Stage-4
+  Stage-0 depends on stages: Stage-4, Stage-3, Stage-6
+  Stage-2 depends on stages: Stage-0
+  Stage-3
+  Stage-5
+  Stage-6 depends on stages: Stage-5
+
+STAGE PLANS:
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: UDFToInteger(key) (type: int), value (type: string), (hash(key) pmod 2) (type: int)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      table:
+                          input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                          serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                          name: default.orcfile_merge1b
+
+  Stage: Stage-7
+    Conditional Operator
+
+  Stage: Stage-4
+    Move Operator
+      files:
+          hdfs directory: true
+#### A masked pattern was here ####
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          partition:
+            ds 1
+            part 
+          replace: true
+          table:
+              input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+              output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+              serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+              name: default.orcfile_merge1b
+
+  Stage: Stage-2
+    Stats-Aggr Operator
+
+  Stage: Stage-3
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Spark Merge File Work 
+            Map Operator Tree:
+                TableScan
+                  File Output Operator
+                    compressed: false
+                    table:
+                        input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                        serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                        name: default.orcfile_merge1b
+
+  Stage: Stage-5
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Spark Merge File Work 
+            Map Operator Tree:
+                TableScan
+                  File Output Operator
+                    compressed: false
+                    table:
+                        input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                        serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                        name: default.orcfile_merge1b
+
+  Stage: Stage-6
+    Move Operator
+      files:
+          hdfs directory: true
+#### A masked pattern was here ####
+
+PREHOOK: query: INSERT OVERWRITE TABLE orcfile_merge1b PARTITION (ds='1', part)
+    SELECT key, value, PMOD(HASH(key), 2) as part
+    FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orcfile_merge1b@ds=1
+POSTHOOK: query: INSERT OVERWRITE TABLE orcfile_merge1b PARTITION (ds='1', part)
+    SELECT key, value, PMOD(HASH(key), 2) as part
+    FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orcfile_merge1b@ds=1/part=0
+POSTHOOK: Output: default@orcfile_merge1b@ds=1/part=1
+POSTHOOK: Lineage: orcfile_merge1b PARTITION(ds=1,part=0).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1b PARTITION(ds=1,part=0).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1b PARTITION(ds=1,part=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1b PARTITION(ds=1,part=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+Found 1 items
+#### A masked pattern was here ####
+PREHOOK: query: -- auto-merge fast way
+EXPLAIN
+    INSERT OVERWRITE TABLE orcfile_merge1c PARTITION (ds='1', part)
+        SELECT key, value, PMOD(HASH(key), 2) as part
+        FROM src
+PREHOOK: type: QUERY
+POSTHOOK: query: -- auto-merge fast way
+EXPLAIN
+    INSERT OVERWRITE TABLE orcfile_merge1c PARTITION (ds='1', part)
+        SELECT key, value, PMOD(HASH(key), 2) as part
+        FROM src
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-7 depends on stages: Stage-1 , consists of Stage-4, Stage-3, Stage-5
+  Stage-4
+  Stage-0 depends on stages: Stage-4, Stage-3, Stage-6
+  Stage-2 depends on stages: Stage-0
+  Stage-3
+  Stage-5
+  Stage-6 depends on stages: Stage-5
+
+STAGE PLANS:
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: UDFToInteger(key) (type: int), value (type: string), (hash(key) pmod 2) (type: int)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      table:
+                          input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                          serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                          name: default.orcfile_merge1c
+
+  Stage: Stage-7
+    Conditional Operator
+
+  Stage: Stage-4
+    Move Operator
+      files:
+          hdfs directory: true
+#### A masked pattern was here ####
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          partition:
+            ds 1
+            part 
+          replace: true
+          table:
+              input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+              output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+              serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+              name: default.orcfile_merge1c
+
+  Stage: Stage-2
+    Stats-Aggr Operator
+
+  Stage: Stage-3
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Spark Merge File Work 
+          Merge File Operator
+            Map Operator Tree:
+                ORC File Merge Operator
+            merge level: stripe
+            input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+
+  Stage: Stage-5
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Spark Merge File Work 
+          Merge File Operator
+            Map Operator Tree:
+                ORC File Merge Operator
+            merge level: stripe
+            input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+
+  Stage: Stage-6
+    Move Operator
+      files:
+          hdfs directory: true
+#### A masked pattern was here ####
+
+PREHOOK: query: INSERT OVERWRITE TABLE orcfile_merge1c PARTITION (ds='1', part)
+    SELECT key, value, PMOD(HASH(key), 2) as part
+    FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orcfile_merge1c@ds=1
+POSTHOOK: query: INSERT OVERWRITE TABLE orcfile_merge1c PARTITION (ds='1', part)
+    SELECT key, value, PMOD(HASH(key), 2) as part
+    FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orcfile_merge1c@ds=1/part=0
+POSTHOOK: Output: default@orcfile_merge1c@ds=1/part=1
+POSTHOOK: Lineage: orcfile_merge1c PARTITION(ds=1,part=0).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1c PARTITION(ds=1,part=0).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1c PARTITION(ds=1,part=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1c PARTITION(ds=1,part=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+Found 1 items
+#### A masked pattern was here ####
+PREHOOK: query: -- Verify
+SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+    FROM orcfile_merge1 WHERE ds='1'
+) t
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orcfile_merge1
+PREHOOK: Input: default@orcfile_merge1@ds=1/part=0
+PREHOOK: Input: default@orcfile_merge1@ds=1/part=1
+#### A masked pattern was here ####
+POSTHOOK: query: -- Verify
+SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+    FROM orcfile_merge1 WHERE ds='1'
+) t
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orcfile_merge1
+POSTHOOK: Input: default@orcfile_merge1@ds=1/part=0
+POSTHOOK: Input: default@orcfile_merge1@ds=1/part=1
+#### A masked pattern was here ####
+-21975308766
+PREHOOK: query: SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+    FROM orcfile_merge1b WHERE ds='1'
+) t
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orcfile_merge1b
+PREHOOK: Input: default@orcfile_merge1b@ds=1/part=0
+PREHOOK: Input: default@orcfile_merge1b@ds=1/part=1
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+    FROM orcfile_merge1b WHERE ds='1'
+) t
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orcfile_merge1b
+POSTHOOK: Input: default@orcfile_merge1b@ds=1/part=0
+POSTHOOK: Input: default@orcfile_merge1b@ds=1/part=1
+#### A masked pattern was here ####
+-21975308766
+PREHOOK: query: SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+    FROM orcfile_merge1c WHERE ds='1'
+) t
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orcfile_merge1c
+PREHOOK: Input: default@orcfile_merge1c@ds=1/part=0
+PREHOOK: Input: default@orcfile_merge1c@ds=1/part=1
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+    FROM orcfile_merge1c WHERE ds='1'
+) t
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orcfile_merge1c
+POSTHOOK: Input: default@orcfile_merge1c@ds=1/part=0
+POSTHOOK: Input: default@orcfile_merge1c@ds=1/part=1
+#### A masked pattern was here ####
+-21975308766
+PREHOOK: query: select count(*) from orcfile_merge1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orcfile_merge1
+PREHOOK: Input: default@orcfile_merge1@ds=1/part=0
+PREHOOK: Input: default@orcfile_merge1@ds=1/part=1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orcfile_merge1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orcfile_merge1
+POSTHOOK: Input: default@orcfile_merge1@ds=1/part=0
+POSTHOOK: Input: default@orcfile_merge1@ds=1/part=1
+#### A masked pattern was here ####
+500
+PREHOOK: query: select count(*) from orcfile_merge1b
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orcfile_merge1b
+PREHOOK: Input: default@orcfile_merge1b@ds=1/part=0
+PREHOOK: Input: default@orcfile_merge1b@ds=1/part=1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orcfile_merge1b
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orcfile_merge1b
+POSTHOOK: Input: default@orcfile_merge1b@ds=1/part=0
+POSTHOOK: Input: default@orcfile_merge1b@ds=1/part=1
+#### A masked pattern was here ####
+500
+PREHOOK: query: select count(*) from orcfile_merge1c
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orcfile_merge1c
+PREHOOK: Input: default@orcfile_merge1c@ds=1/part=0
+PREHOOK: Input: default@orcfile_merge1c@ds=1/part=1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orcfile_merge1c
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orcfile_merge1c
+POSTHOOK: Input: default@orcfile_merge1c@ds=1/part=0
+POSTHOOK: Input: default@orcfile_merge1c@ds=1/part=1
+#### A masked pattern was here ####
+500
+PREHOOK: query: DROP TABLE orcfile_merge1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@orcfile_merge1
+PREHOOK: Output: default@orcfile_merge1
+POSTHOOK: query: DROP TABLE orcfile_merge1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@orcfile_merge1
+POSTHOOK: Output: default@orcfile_merge1
+PREHOOK: query: DROP TABLE orcfile_merge1b
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@orcfile_merge1b
+PREHOOK: Output: default@orcfile_merge1b
+POSTHOOK: query: DROP TABLE orcfile_merge1b
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@orcfile_merge1b
+POSTHOOK: Output: default@orcfile_merge1b
+PREHOOK: query: DROP TABLE orcfile_merge1c
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@orcfile_merge1c
+PREHOOK: Output: default@orcfile_merge1c
+POSTHOOK: query: DROP TABLE orcfile_merge1c
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@orcfile_merge1c
+POSTHOOK: Output: default@orcfile_merge1c