You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/11/18 23:41:15 UTC
[33/34] hive git commit: HIVE-12434: Merge branch 'spark' to master
HIVE-12434: Merge branch 'spark' to master
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0a88760f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0a88760f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0a88760f
Branch: refs/heads/master-fixed
Commit: 0a88760f5348f8c8cd9a335f705326ed13df7d8d
Parents: 2c19b7d
Author: Xuefu Zhang <xz...@Cloudera.com>
Authored: Wed Nov 18 13:38:52 2015 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Wed Nov 18 14:40:11 2015 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +-
.../src/test/templates/TestHBaseCliDriver.vm | 63 +-
.../templates/TestHBaseNegativeCliDriver.vm | 64 +-
.../test/resources/testconfiguration.properties | 20 +-
.../hadoop/hive/hbase/HBaseTestSetup.java | 9 +-
pom.xml | 12 +-
.../hadoop/hive/ql/exec/ScriptOperator.java | 15 +
.../persistence/MapJoinTableContainerSerDe.java | 70 +
.../hive/ql/exec/spark/HashTableLoader.java | 18 +-
.../ql/exec/spark/RemoteHiveSparkClient.java | 4 +-
.../hive/ql/exec/spark/SparkPlanGenerator.java | 17 +
.../hive/ql/exec/spark/SparkUtilities.java | 10 +-
.../spark/status/impl/JobMetricsListener.java | 89 +-
.../mapjoin/VectorMapJoinCommonOperator.java | 4 +-
.../fast/VectorMapJoinFastTableContainer.java | 2 +-
.../hive/ql/optimizer/physical/Vectorizer.java | 4 +-
.../optimizer/spark/SparkMapJoinOptimizer.java | 10 +
.../ql/parse/spark/GenSparkProcContext.java | 2 -
.../hive/ql/parse/spark/GenSparkWork.java | 7 -
ql/src/test/queries/clientpositive/orc_merge1.q | 2 +
ql/src/test/queries/clientpositive/orc_merge2.q | 1 +
ql/src/test/queries/clientpositive/orc_merge3.q | 1 +
ql/src/test/queries/clientpositive/orc_merge4.q | 2 +
ql/src/test/queries/clientpositive/orc_merge5.q | 3 +
ql/src/test/queries/clientpositive/orc_merge6.q | 3 +
ql/src/test/queries/clientpositive/orc_merge7.q | 3 +
ql/src/test/queries/clientpositive/orc_merge8.q | 2 +
ql/src/test/queries/clientpositive/orc_merge9.q | 1 +
.../clientpositive/orc_merge_incompat1.q | 1 +
.../clientpositive/orc_merge_incompat2.q | 1 +
.../clientpositive/spark/orc_merge1.q.out | 485 ++++++
.../clientpositive/spark/orc_merge2.q.out | 268 ++++
.../clientpositive/spark/orc_merge3.q.out | 207 +++
.../clientpositive/spark/orc_merge4.q.out | 231 +++
.../clientpositive/spark/orc_merge5.q.out | 334 +++++
.../clientpositive/spark/orc_merge6.q.out | 508 +++++++
.../clientpositive/spark/orc_merge7.q.out | 619 ++++++++
.../clientpositive/spark/orc_merge8.q.out | 130 ++
.../clientpositive/spark/orc_merge9.q.out | 186 +++
.../spark/orc_merge_incompat1.q.out | 240 +++
.../spark/orc_merge_incompat2.q.out | 370 +++++
.../spark/vector_inner_join.q.out | 853 +++++++++++
.../spark/vector_outer_join0.q.out | 242 +++
.../spark/vector_outer_join1.q.out | 631 ++++++++
.../spark/vector_outer_join2.q.out | 323 ++++
.../spark/vector_outer_join3.q.out | 630 ++++++++
.../spark/vector_outer_join4.q.out | 1000 +++++++++++++
.../spark/vector_outer_join5.q.out | 1406 ++++++++++++++++++
ql/src/test/templates/TestCliDriver.vm | 74 +-
ql/src/test/templates/TestCompareCliDriver.vm | 71 +-
ql/src/test/templates/TestNegativeCliDriver.vm | 70 +-
ql/src/test/templates/TestParseNegative.vm | 65 +-
.../apache/hive/spark/client/JobContext.java | 4 +-
.../hive/spark/client/JobContextImpl.java | 8 +-
.../apache/hive/spark/client/RemoteDriver.java | 60 +-
.../hive/spark/client/SparkClientImpl.java | 2 +-
.../hive/spark/client/SparkClientUtilities.java | 25 +-
.../ptest2/src/main/resources/batch-exec.vm | 2 -
58 files changed, 9000 insertions(+), 488 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7cab9ae..2185f85 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -827,8 +827,8 @@ public class HiveConf extends Configuration {
HIVEMAPJOINBUCKETCACHESIZE("hive.mapjoin.bucket.cache.size", 100, ""),
HIVEMAPJOINUSEOPTIMIZEDTABLE("hive.mapjoin.optimized.hashtable", true,
- "Whether Hive should use memory-optimized hash table for MapJoin. Only works on Tez,\n" +
- "because memory-optimized hashtable cannot be serialized."),
+ "Whether Hive should use memory-optimized hash table for MapJoin.\n" +
+ "Only works on Tez and Spark, because memory-optimized hashtable cannot be serialized."),
HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT("hive.mapjoin.optimized.hashtable.probe.percent",
(float) 0.5, "Probing space percentage of the optimized hashtable"),
HIVEUSEHYBRIDGRACEHASHJOIN("hive.mapjoin.hybridgrace.hashtable", true, "Whether to use hybrid" +
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
----------------------------------------------------------------------
diff --git a/hbase-handler/src/test/templates/TestHBaseCliDriver.vm b/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
index de0be32..6f4a7c1 100644
--- a/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
+++ b/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
@@ -17,38 +17,25 @@
*/
package org.apache.hadoop.hive.cli;
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-import java.io.*;
-import java.util.*;
-
-import org.apache.hadoop.hive.ql.QTestUtil;
import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType;
import org.apache.hadoop.hive.hbase.HBaseQTestUtil;
import org.apache.hadoop.hive.hbase.HBaseTestSetup;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
-public class $className extends TestCase {
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class $className {
private static final String HIVE_ROOT = HBaseQTestUtil.ensurePathEndsInSlash(System.getProperty("hive.root"));
private HBaseQTestUtil qt;
- private HBaseTestSetup setup;
+ private static HBaseTestSetup setup = new HBaseTestSetup();
- public static class TestHBaseCliDriverAddTestFromQFiles implements QTestUtil.SuiteAddTestFunctor {
- public void addTestToSuite(TestSuite suite, Object setup, String tName) {
- suite.addTest(new $className("testCliDriver_"+tName, (HBaseTestSetup)setup));
- }
- }
-
- public $className(String name, HBaseTestSetup setup) {
- super(name);
- qt = null;
- this.setup = setup;
- }
-
- @Override
- protected void setUp() {
+ @Before
+ public void setUp() {
MiniClusterType miniMR = MiniClusterType.valueForString("$clusterMode");
String initScript = "$initScript";
@@ -65,12 +52,11 @@ public class $className extends TestCase {
}
}
- @Override
- protected void tearDown() {
+ @After
+ public void tearDown() {
try {
qt.shutdown();
- }
- catch (Exception e) {
+ } catch (Exception e) {
System.err.println("Exception: " + e.getMessage());
e.printStackTrace();
System.err.flush();
@@ -78,23 +64,9 @@ public class $className extends TestCase {
}
}
- public static Test suite() {
- Set<String> qFilesToExecute = new HashSet<String>();
- String qFiles = System.getProperty("qfile", "").trim();
- if(!qFiles.isEmpty()) {
- for(String qFile : qFiles.split(",")) {
- qFile = qFile.trim();
- if(!qFile.isEmpty()) {
- qFilesToExecute.add(qFile);
- }
- }
- }
- TestSuite suite = new TestSuite();
- HBaseTestSetup setup = new HBaseTestSetup(suite);
-
- QTestUtil.addTestsToSuiteFromQfileNames("$qFileNamesFile", qFilesToExecute,
- suite, setup, new TestHBaseCliDriverAddTestFromQFiles());
- return setup;
+ @AfterClass
+ public static void closeHBaseConnections() throws Exception {
+ setup.tearDown();
}
#foreach ($qf in $qfiles)
@@ -102,6 +74,7 @@ public class $className extends TestCase {
#set ($eidx = $fname.indexOf('.'))
#set ($tname = $fname.substring(0, $eidx))
#set ($fpath = $qfilesMap.get($fname))
+ @Test
public void testCliDriver_$tname() throws Exception {
runTest("$tname", "$fname", (HIVE_ROOT + "$fpath"));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm
----------------------------------------------------------------------
diff --git a/hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm b/hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm
index b402585..043bd87 100644
--- a/hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm
+++ b/hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm
@@ -18,38 +18,25 @@
package org.apache.hadoop.hive.cli;
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-import java.io.*;
-import java.util.*;
-
-import org.apache.hadoop.hive.ql.QTestUtil;
import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType;
import org.apache.hadoop.hive.hbase.HBaseQTestUtil;
import org.apache.hadoop.hive.hbase.HBaseTestSetup;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
-public class $className extends TestCase {
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class $className {
private static final String HIVE_ROOT = HBaseQTestUtil.ensurePathEndsInSlash(System.getProperty("hive.root"));
private HBaseQTestUtil qt;
- private HBaseTestSetup setup;
-
- public static class TestHBaseNegativeCliDriverAddTestFromQFiles implements QTestUtil.SuiteAddTestFunctor {
- public void addTestToSuite(TestSuite suite, Object setup, String tName) {
- suite.addTest(new $className("testCliDriver_"+tName, (HBaseTestSetup)setup));
- }
- }
-
- public $className(String name, HBaseTestSetup setup) {
- super(name);
- qt = null;
- this.setup = setup;
- }
+ private static HBaseTestSetup setup = new HBaseTestSetup();
- @Override
- protected void setUp() {
+ @Before
+ public void setUp() {
MiniClusterType miniMR = MiniClusterType.valueForString("$clusterMode");
String initScript = "$initScript";
@@ -66,12 +53,11 @@ public class $className extends TestCase {
}
}
- @Override
- protected void tearDown() {
+ @After
+ public void tearDown() {
try {
qt.shutdown();
- }
- catch (Exception e) {
+ } catch (Exception e) {
System.err.println("Exception: " + e.getMessage());
e.printStackTrace();
System.err.flush();
@@ -79,24 +65,9 @@ public class $className extends TestCase {
}
}
- public static Test suite() {
- Set<String> qFilesToExecute = new HashSet<String>();
- String qFiles = System.getProperty("qfile", "").trim();
- if(!qFiles.isEmpty()) {
- for(String qFile : qFiles.split(",")) {
- qFile = qFile.trim();
- if(!qFile.isEmpty()) {
- qFilesToExecute.add(qFile);
- }
- }
- }
-
- TestSuite suite = new TestSuite();
- HBaseTestSetup setup = new HBaseTestSetup(suite);
-
- QTestUtil.addTestsToSuiteFromQfileNames("$qFileNamesFile", qFilesToExecute,
- suite, setup, new TestHBaseNegativeCliDriverAddTestFromQFiles());
- return setup;
+ @AfterClass
+ public static void closeHBaseConnections() throws Exception {
+ setup.tearDown();
}
#foreach ($qf in $qfiles)
@@ -104,6 +75,7 @@ public class $className extends TestCase {
#set ($eidx = $fname.indexOf('.'))
#set ($tname = $fname.substring(0, $eidx))
#set ($fpath = $qfilesMap.get($fname))
+ @Test
public void testCliDriver_$tname() throws Exception {
runTest("$tname", "$fname", (HIVE_ROOT + "$fpath"));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index ece43cc..a49a301 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1257,6 +1257,17 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
load_fs2.q,\
load_hdfs_file_with_space_in_the_name.q,\
optrstat_groupby.q,\
+ orc_merge1.q,\
+ orc_merge2.q,\
+ orc_merge3.q,\
+ orc_merge4.q,\
+ orc_merge5.q,\
+ orc_merge6.q,\
+ orc_merge7.q,\
+ orc_merge8.q,\
+ orc_merge9.q,\
+ orc_merge_incompat1.q,\
+ orc_merge_incompat2.q,\
parallel_orderby.q,\
quotedid_smb.q,\
reduce_deduplicate.q,\
@@ -1270,7 +1281,14 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
stats_counter_partitioned.q,\
temp_table_external.q,\
truncate_column_buckets.q,\
- uber_reduce.q
+ uber_reduce.q,\
+ vector_inner_join.q,\
+ vector_outer_join0.q,\
+ vector_outer_join1.q,\
+ vector_outer_join2.q,\
+ vector_outer_join3.q,\
+ vector_outer_join4.q,\
+ vector_outer_join5.q
# These tests are removed from miniSparkOnYarn.query.files
# ql_rewrite_gbtoidx.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java b/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
index 300f1cf..e6383dc 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
@@ -45,7 +45,7 @@ import org.apache.zookeeper.Watcher;
* HBaseTestSetup defines HBase-specific test fixtures which are
* reused across testcases.
*/
-public class HBaseTestSetup extends TestSetup {
+public class HBaseTestSetup {
private MiniHBaseCluster hbaseCluster;
private int zooKeeperPort;
@@ -54,10 +54,6 @@ public class HBaseTestSetup extends TestSetup {
private static final int NUM_REGIONSERVERS = 1;
- public HBaseTestSetup(Test test) {
- super(test);
- }
-
public HConnection getConnection() {
return this.hbaseConn;
}
@@ -170,8 +166,7 @@ public class HBaseTestSetup extends TestSetup {
return port;
}
- @Override
- protected void tearDown() throws Exception {
+ public void tearDown() throws Exception {
if (hbaseConn != null) {
hbaseConn.close();
hbaseConn = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4a90cef..c6df4a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -160,7 +160,7 @@
<ST4.version>4.0.4</ST4.version>
<tez.version>0.8.1-alpha</tez.version>
<super-csv.version>2.2.0</super-csv.version>
- <spark.version>1.4.0</spark.version>
+ <spark.version>1.5.0</spark.version>
<scala.binary.version>2.10</scala.binary.version>
<scala.version>2.10.4</scala.version>
<tempus-fugit.version>1.1</tempus-fugit.version>
@@ -222,16 +222,6 @@
<enabled>false</enabled>
</snapshots>
</repository>
- <repository>
- <id>spark-1.3</id>
- <url>https://s3-us-west-1.amazonaws.com/hive-spark/maven2/spark_2.10-1.3-rc1/</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
</repositories>
<dependencyManagement>
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
index 5df9ea2..63837ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkEnv;
import org.apache.spark.SparkFiles;
import java.io.BufferedInputStream;
@@ -329,6 +331,7 @@ public class ScriptOperator extends Operator<ScriptDesc> implements
// initialize the user's process only when you receive the first row
if (firstRow) {
firstRow = false;
+ SparkConf sparkConf = null;
try {
String[] cmdArgs = splitArgs(conf.getScriptCmd());
@@ -341,6 +344,7 @@ public class ScriptOperator extends Operator<ScriptDesc> implements
// In spark local mode, we need to search added files in root directory.
if (HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ sparkConf = SparkEnv.get().conf();
finder.prependPathComponent(SparkFiles.getRootDirectory());
}
File f = finder.getAbsolutePath(prog);
@@ -371,6 +375,17 @@ public class ScriptOperator extends Operator<ScriptDesc> implements
String idEnvVarVal = getOperatorId();
env.put(safeEnvVarName(idEnvVarName), idEnvVarVal);
+ // For spark, in non-local mode, any added dependencies are stored at
+ // SparkFiles::getRootDirectory, which is the executor's working directory.
+ // In local mode, we need to manually point the process's working directory to it,
+ // in order to make the dependencies accessible.
+ if (sparkConf != null) {
+ String master = sparkConf.get("spark.master");
+ if (master.equals("local") || master.startsWith("local[")) {
+ pb.directory(new File(SparkFiles.getRootDirectory()));
+ }
+ }
+
scriptPid = pb.start(); // Runtime.getRuntime().exec(wrappedCmdArgs);
DataOutputStream scriptOut = new DataOutputStream(
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
index e97a9f0..d6deabe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
@@ -32,7 +32,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -195,6 +197,74 @@ public class MapJoinTableContainerSerDe {
}
}
+ /**
+ * Loads the small table into a VectorMapJoinFastTableContainer. Only used on Spark path.
+ * @param mapJoinDesc The descriptor for the map join
+ * @param fs FileSystem of the folder.
+ * @param folder The folder to load table container.
+ * @param hconf The hive configuration
+ * @return Loaded table.
+ */
+ @SuppressWarnings("unchecked")
+ public MapJoinTableContainer loadFastContainer(MapJoinDesc mapJoinDesc,
+ FileSystem fs, Path folder, Configuration hconf) throws HiveException {
+ try {
+ if (!fs.isDirectory(folder)) {
+ throw new HiveException("Error, not a directory: " + folder);
+ }
+ FileStatus[] fileStatuses = fs.listStatus(folder);
+ if (fileStatuses == null || fileStatuses.length == 0) {
+ return null;
+ }
+
+ SerDe keySerDe = keyContext.getSerDe();
+ SerDe valueSerDe = valueContext.getSerDe();
+ Writable key = keySerDe.getSerializedClass().newInstance();
+ Writable value = valueSerDe.getSerializedClass().newInstance();
+
+ VectorMapJoinFastTableContainer tableContainer =
+ new VectorMapJoinFastTableContainer(mapJoinDesc, hconf, -1);
+
+ for (FileStatus fileStatus : fileStatuses) {
+ Path filePath = fileStatus.getPath();
+ if (ShimLoader.getHadoopShims().isDirectory(fileStatus)) {
+ throw new HiveException("Error, not a file: " + filePath);
+ }
+ InputStream is = null;
+ ObjectInputStream in = null;
+ try {
+ is = fs.open(filePath, 4096);
+ in = new ObjectInputStream(is);
+ // skip the name and metadata
+ in.readUTF();
+ in.readObject();
+ int numKeys = in.readInt();
+ for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
+ key.readFields(in);
+ long numRows = in.readLong();
+ for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
+ value.readFields(in);
+ tableContainer.putRow(null, key, null, value);
+ }
+ }
+ } finally {
+ if (in != null) {
+ in.close();
+ } else if (is != null) {
+ is.close();
+ }
+ }
+ }
+
+ tableContainer.seal();
+ return tableContainer;
+ } catch (IOException e) {
+ throw new HiveException("IO error while trying to create table container", e);
+ } catch (Exception e) {
+ throw new HiveException("Error while trying to create table container", e);
+ }
+ }
+
public void persist(ObjectOutputStream out, MapJoinPersistableTableContainer tableContainer)
throws HiveException {
int numKeys = tableContainer.size();
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
index 39f9d40..7ada611 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SparkBucketMapJoinContext;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.mapred.JobConf;
@@ -62,6 +63,8 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
private MapJoinOperator joinOp;
private MapJoinDesc desc;
+ private boolean useFastContainer = false;
+
@Override
public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf,
MapJoinOperator joinOp) {
@@ -69,6 +72,12 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
this.hconf = hconf;
this.joinOp = joinOp;
this.desc = joinOp.getConf();
+ if (desc.getVectorMode() && HiveConf.getBoolVar(
+ hconf, HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) {
+ VectorMapJoinDesc vectorDesc = desc.getVectorDesc();
+ useFastContainer = vectorDesc != null && vectorDesc.hashTableImplementationType() ==
+ VectorMapJoinDesc.HashTableImplementationType.FAST;
+ }
}
@Override
@@ -98,7 +107,7 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
FileSystem fs = FileSystem.get(baseDir.toUri(), hconf);
BucketMapJoinContext mapJoinCtx = localWork.getBucketMapjoinContext();
boolean firstContainer = true;
- boolean useOptimizedContainer = HiveConf.getBoolVar(
+ boolean useOptimizedContainer = !useFastContainer && HiveConf.getBoolVar(
hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE);
for (int pos = 0; pos < mapJoinTables.length; pos++) {
if (pos == desc.getPosBigTable() || mapJoinTables[pos] != null) {
@@ -146,14 +155,17 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
MapJoinTableContainerSerDe mapJoinTableSerde) throws HiveException {
LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path);
if (!SparkUtilities.isDedicatedCluster(hconf)) {
- return mapJoinTableSerde.load(fs, path, hconf);
+ return useFastContainer ? mapJoinTableSerde.loadFastContainer(desc, fs, path, hconf) :
+ mapJoinTableSerde.load(fs, path, hconf);
}
MapJoinTableContainer mapJoinTable = SmallTableCache.get(path);
if (mapJoinTable == null) {
synchronized (path.toString().intern()) {
mapJoinTable = SmallTableCache.get(path);
if (mapJoinTable == null) {
- mapJoinTable = mapJoinTableSerde.load(fs, path, hconf);
+ mapJoinTable = useFastContainer ?
+ mapJoinTableSerde.loadFastContainer(desc, fs, path, hconf) :
+ mapJoinTableSerde.load(fs, path, hconf);
SmallTableCache.cache(path, mapJoinTable);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 86b9d67..c4cb2ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -295,11 +295,11 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
// Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark
// may need to load classes from this jar in other threads.
- Set<String> addedJars = jc.getAddedJars();
+ Map<String, Long> addedJars = jc.getAddedJars();
if (addedJars != null && !addedJars.isEmpty()) {
SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir());
KryoSerializer.setClassLoader(Thread.currentThread().getContextClassLoader());
- localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";"));
+ localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";"));
}
Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index 6951993..6abef4e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.spark;
+import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -25,6 +26,8 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
@@ -228,6 +231,20 @@ public class SparkPlanGenerator {
byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
boolean caching = isCachingWork(work, sparkWork);
if (work instanceof MapWork) {
+ // Create tmp dir for MergeFileWork
+ if (work instanceof MergeFileWork) {
+ Path outputPath = ((MergeFileWork) work).getOutputDir();
+ Path tempOutPath = Utilities.toTempPath(outputPath);
+ FileSystem fs = outputPath.getFileSystem(jobConf);
+ try {
+ if (!fs.exists(tempOutPath)) {
+ fs.mkdirs(tempOutPath);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Can't make path " + outputPath + " : " + e.getMessage());
+ }
+ }
MapTran mapTran = new MapTran(caching);
HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter);
mapTran.setMapFunction(mapFunc);
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index cf2c3bc..0268469 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -22,8 +22,6 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
-import java.util.UUID;
-import java.util.Collection;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FilenameUtils;
@@ -91,11 +89,11 @@ public class SparkUtilities {
*/
public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException {
Path localFile = new Path(source.getPath());
- // give the uploaded file a UUID
- Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf),
- UUID.randomUUID() + "-" + getFileName(source));
+ Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), getFileName(source));
FileSystem fileSystem = FileSystem.get(conf);
- fileSystem.copyFromLocalFile(localFile, remoteFile);
+ // Overwrite if the remote file already exists. Whether the file can be added
+ // on executor is up to spark, i.e. spark.files.overwrite
+ fileSystem.copyFromLocalFile(false, true, localFile, remoteFile);
Path fullPath = fileSystem.getFileStatus(remoteFile).getPath();
return fullPath.toUri();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
index 84603d5..09c54c1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
@@ -23,29 +23,16 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.spark.JavaSparkListener;
import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.scheduler.SparkListener;
-import org.apache.spark.scheduler.SparkListenerApplicationEnd;
-import org.apache.spark.scheduler.SparkListenerApplicationStart;
-import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
-import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
-import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
-import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
-import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
-import org.apache.spark.scheduler.SparkListenerStageCompleted;
-import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
-import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
-import org.apache.spark.scheduler.SparkListenerTaskStart;
-import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
-import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
-import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-public class JobMetricsListener implements SparkListener {
+public class JobMetricsListener extends JavaSparkListener {
private static final Logger LOG = LoggerFactory.getLogger(JobMetricsListener.class);
@@ -54,36 +41,6 @@ public class JobMetricsListener implements SparkListener {
private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap();
@Override
- public void onExecutorRemoved(SparkListenerExecutorRemoved removed) {
-
- }
-
- @Override
- public void onExecutorAdded(SparkListenerExecutorAdded added) {
-
- }
-
- @Override
- public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
-
- }
-
- @Override
- public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
-
- }
-
- @Override
- public void onTaskStart(SparkListenerTaskStart taskStart) {
-
- }
-
- @Override
- public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
-
- }
-
- @Override
public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
int stageId = taskEnd.stageId();
int stageAttemptId = taskEnd.stageAttemptId();
@@ -119,46 +76,6 @@ public class JobMetricsListener implements SparkListener {
jobIdToStageId.put(jobId, intStageIds);
}
- @Override
- public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
-
- }
-
- @Override
- public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
-
- }
-
- @Override
- public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
-
- }
-
- @Override
- public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
-
- }
-
- @Override
- public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
-
- }
-
- @Override
- public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
-
- }
-
- @Override
- public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
-
- }
-
- @Override
- public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
-
- }
-
public synchronized Map<String, List<TaskMetrics>> getJobMetric(int jobId) {
return allJobMetrics.get(jobId);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index 6ecfaf7..2502ae2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -526,7 +526,9 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
break;
case FAST:
// Use our specialized hash table loader.
- hashTableLoader = new VectorMapJoinFastHashTableLoader();
+ hashTableLoader = HiveConf.getVar(
+ hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark") ?
+ HashTableLoaderFactory.getLoader(hconf) : new VectorMapJoinFastHashTableLoader();
break;
default:
throw new RuntimeException("Unknown vector map join hash table implementation type " + hashTableImplementationType.name());
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index bcfc807..bd4a595 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -195,7 +195,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
@Override
public void clear() {
- throw new RuntimeException("Not applicable");
+ // Do nothing
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index ce49b24..a842649 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -181,6 +181,7 @@ public class Vectorizer implements PhysicalPlanResolver {
Set<String> supportedAggregationUdfs = new HashSet<String>();
private HiveConf hiveConf;
+ private boolean isSpark;
public Vectorizer() {
@@ -1163,6 +1164,7 @@ public class Vectorizer implements PhysicalPlanResolver {
LOG.info("Vectorization is disabled");
return physicalContext;
}
+ isSpark = (HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark"));
// create dispatcher and graph walker
Dispatcher disp = new VectorizationDispatcher(physicalContext);
TaskGraphWalker ogw = new TaskGraphWalker(disp);
@@ -2120,7 +2122,7 @@ public class Vectorizer implements PhysicalPlanResolver {
case MAPJOIN:
{
MapJoinDesc desc = (MapJoinDesc) op.getConf();
- boolean specialize = canSpecializeMapJoin(op, desc, isTez);
+ boolean specialize = canSpecializeMapJoin(op, desc, isTez || isSpark);
if (!specialize) {
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
index 4d2b8d6..005fad2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.OpTraits;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.Statistics;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
/**
* SparkMapJoinOptimizer cloned from ConvertJoinMapJoin is an optimization that replaces a common join
@@ -89,6 +91,14 @@ public class SparkMapJoinOptimizer implements NodeProcessor {
LOG.info("Convert to non-bucketed map join");
MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos);
+ // For native vectorized map join, we require the key SerDe to be BinarySortableSerDe
+ // Note: the MJ may not really get natively-vectorized later,
+ // but changing SerDe won't hurt correctness
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_ENABLED) &&
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
+ mapJoinOp.getConf().getKeyTblDesc().getProperties().setProperty(
+ serdeConstants.SERIALIZATION_LIB, BinarySortableSerDe.class.getName());
+ }
if (conf.getBoolVar(HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) {
LOG.info("Check if it can be converted to bucketed map join");
numBuckets = convertJoinBucketMapJoin(joinOp, mapJoinOp,
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
index 0a0c791..62237e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
@@ -129,7 +129,6 @@ public class GenSparkProcContext implements NodeProcessorCtx {
public final Map<Operator<?>, BaseWork> unionWorkMap;
public final List<UnionOperator> currentUnionOperators;
public final Set<BaseWork> workWithUnionOperators;
- public final Set<ReduceSinkOperator> clonedReduceSinks;
public final Set<FileSinkOperator> fileSinkSet;
public final Map<FileSinkOperator, List<FileSinkOperator>> fileSinkMap;
@@ -180,7 +179,6 @@ public class GenSparkProcContext implements NodeProcessorCtx {
this.unionWorkMap = new LinkedHashMap<Operator<?>, BaseWork>();
this.currentUnionOperators = new LinkedList<UnionOperator>();
this.workWithUnionOperators = new LinkedHashSet<BaseWork>();
- this.clonedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
this.fileSinkMap = new LinkedHashMap<FileSinkOperator, List<FileSinkOperator>>();
this.pruningSinkSet = new LinkedHashSet<Operator<?>>();
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
index 7a7b558..ea5e414 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
@@ -94,12 +94,6 @@ public class GenSparkWork implements NodeProcessor {
LOG.debug("Root operator: " + root);
LOG.debug("Leaf operator: " + operator);
- if (context.clonedReduceSinks.contains(operator)) {
- // if we're visiting a terminal we've created ourselves,
- // just skip and keep going
- return null;
- }
-
SparkWork sparkWork = context.currentTask.getWork();
SMBMapJoinOperator smbOp = GenSparkUtils.getChildOperator(root, SMBMapJoinOperator.class);
@@ -192,7 +186,6 @@ public class GenSparkWork implements NodeProcessor {
// we've already set this one up. Need to clone for the next work.
r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
(ReduceSinkDesc)r.getConf().clone(), r.getParentOperators());
- context.clonedReduceSinks.add(r);
}
r.getConf().setOutputName(work.getName());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge1.q b/ql/src/test/queries/clientpositive/orc_merge1.q
index a8ac85b..afef1e5 100644
--- a/ql/src/test/queries/clientpositive/orc_merge1.q
+++ b/ql/src/test/queries/clientpositive/orc_merge1.q
@@ -10,6 +10,7 @@ set tez.grouping.max-size=2000;
set hive.merge.tezfiles=false;
set hive.merge.mapfiles=false;
set hive.merge.mapredfiles=false;
+set hive.merge.sparkfiles=false;
-- SORT_QUERY_RESULTS
@@ -39,6 +40,7 @@ dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/orcfile_merge1/ds=1/part=0/;
set hive.merge.tezfiles=true;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
+set hive.merge.sparkfiles=true;
-- auto-merge slow way
EXPLAIN
INSERT OVERWRITE TABLE orcfile_merge1b PARTITION (ds='1', part)
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge2.q b/ql/src/test/queries/clientpositive/orc_merge2.q
index 44ef280..6d229f1 100644
--- a/ql/src/test/queries/clientpositive/orc_merge2.q
+++ b/ql/src/test/queries/clientpositive/orc_merge2.q
@@ -2,6 +2,7 @@ set hive.explain.user=false;
set hive.merge.orcfile.stripe.level=true;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.merge.sparkfiles=true;
DROP TABLE orcfile_merge2a;
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge3.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge3.q b/ql/src/test/queries/clientpositive/orc_merge3.q
index 9722e6d..f5600c6 100644
--- a/ql/src/test/queries/clientpositive/orc_merge3.q
+++ b/ql/src/test/queries/clientpositive/orc_merge3.q
@@ -1,5 +1,6 @@
set hive.explain.user=false;
set hive.merge.orcfile.stripe.level=true;
+set hive.merge.sparkfiles=true;
DROP TABLE orcfile_merge3a;
DROP TABLE orcfile_merge3b;
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge4.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge4.q b/ql/src/test/queries/clientpositive/orc_merge4.q
index 3b50465..536e717 100644
--- a/ql/src/test/queries/clientpositive/orc_merge4.q
+++ b/ql/src/test/queries/clientpositive/orc_merge4.q
@@ -9,12 +9,14 @@ CREATE TABLE orcfile_merge3a (key int, value string)
CREATE TABLE orcfile_merge3b (key int, value string) STORED AS TEXTFILE;
set hive.merge.mapfiles=false;
+set hive.merge.sparkfiles=false;
INSERT OVERWRITE TABLE orcfile_merge3a PARTITION (ds='1')
SELECT * FROM src;
dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/orcfile_merge3a/ds=1/;
set hive.merge.mapfiles=true;
+set hive.merge.sparkfiles=true;
INSERT OVERWRITE TABLE orcfile_merge3a PARTITION (ds='1')
SELECT * FROM src;
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge5.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge5.q b/ql/src/test/queries/clientpositive/orc_merge5.q
index 3d32875..c24c407 100644
--- a/ql/src/test/queries/clientpositive/orc_merge5.q
+++ b/ql/src/test/queries/clientpositive/orc_merge5.q
@@ -17,6 +17,7 @@ set hive.merge.mapredfiles=false;
set hive.compute.splits.in.am=true;
set tez.grouping.min-size=1000;
set tez.grouping.max-size=50000;
+set hive.merge.sparkfiles=false;
-- 3 mappers
explain insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
@@ -31,6 +32,7 @@ set hive.merge.orcfile.stripe.level=true;
set hive.merge.tezfiles=true;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
+set hive.merge.sparkfiles=true;
-- 3 mappers
explain insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
@@ -45,6 +47,7 @@ set hive.merge.orcfile.stripe.level=false;
set hive.merge.tezfiles=false;
set hive.merge.mapfiles=false;
set hive.merge.mapredfiles=false;
+set hive.merge.sparkfiles=false;
insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
analyze table orc_merge5b compute statistics noscan;
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge6.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge6.q b/ql/src/test/queries/clientpositive/orc_merge6.q
index 6bdaa9e..1612a8b 100644
--- a/ql/src/test/queries/clientpositive/orc_merge6.q
+++ b/ql/src/test/queries/clientpositive/orc_merge6.q
@@ -18,6 +18,7 @@ set hive.merge.mapredfiles=false;
set hive.compute.splits.in.am=true;
set tez.grouping.min-size=1000;
set tez.grouping.max-size=50000;
+set hive.merge.sparkfiles=false;
-- 3 mappers
explain insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
@@ -36,6 +37,7 @@ set hive.merge.orcfile.stripe.level=true;
set hive.merge.tezfiles=true;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
+set hive.merge.sparkfiles=true;
-- 3 mappers
explain insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
@@ -54,6 +56,7 @@ set hive.merge.orcfile.stripe.level=false;
set hive.merge.tezfiles=false;
set hive.merge.mapfiles=false;
set hive.merge.mapredfiles=false;
+set hive.merge.sparkfiles=false;
insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
insert overwrite table orc_merge5a partition (year="2001",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge7.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge7.q b/ql/src/test/queries/clientpositive/orc_merge7.q
index 7a351c6..49b81bf 100644
--- a/ql/src/test/queries/clientpositive/orc_merge7.q
+++ b/ql/src/test/queries/clientpositive/orc_merge7.q
@@ -22,6 +22,7 @@ set tez.grouping.max-size=50000;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.optimize.sort.dynamic.partition=false;
+set hive.merge.sparkfiles=false;
-- 3 mappers
explain insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
@@ -40,6 +41,7 @@ set hive.merge.orcfile.stripe.level=true;
set hive.merge.tezfiles=true;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
+set hive.merge.sparkfiles=true;
-- 3 mappers
explain insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
@@ -58,6 +60,7 @@ set hive.merge.orcfile.stripe.level=false;
set hive.merge.tezfiles=false;
set hive.merge.mapfiles=false;
set hive.merge.mapredfiles=false;
+set hive.merge.sparkfiles=false;
insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge8.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge8.q b/ql/src/test/queries/clientpositive/orc_merge8.q
index 61ea4bf..30a892b 100644
--- a/ql/src/test/queries/clientpositive/orc_merge8.q
+++ b/ql/src/test/queries/clientpositive/orc_merge8.q
@@ -30,6 +30,7 @@ set hive.merge.orcfile.stripe.level=false;
set hive.merge.tezfiles=false;
set hive.merge.mapfiles=false;
set hive.merge.mapredfiles=false;
+set hive.merge.sparkfiles=false;
insert overwrite table alltypes_orc select * from alltypes;
insert into table alltypes_orc select * from alltypes;
@@ -40,6 +41,7 @@ set hive.merge.orcfile.stripe.level=true;
set hive.merge.tezfiles=true;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
+set hive.merge.sparkfiles=true;
alter table alltypes_orc concatenate;
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge9.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge9.q b/ql/src/test/queries/clientpositive/orc_merge9.q
index 010b5a1..5f387ba 100644
--- a/ql/src/test/queries/clientpositive/orc_merge9.q
+++ b/ql/src/test/queries/clientpositive/orc_merge9.q
@@ -15,6 +15,7 @@ set hive.merge.orcfile.stripe.level=true;
set hive.merge.tezfiles=true;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
+set hive.merge.sparkfiles=true;
select count(*) from ts_merge;
alter table ts_merge concatenate;
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge_incompat1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge_incompat1.q b/ql/src/test/queries/clientpositive/orc_merge_incompat1.q
index dd58524..b9f6246 100644
--- a/ql/src/test/queries/clientpositive/orc_merge_incompat1.q
+++ b/ql/src/test/queries/clientpositive/orc_merge_incompat1.q
@@ -10,6 +10,7 @@ SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hive.merge.orcfile.stripe.level=false;
set hive.merge.mapfiles=false;
set hive.merge.mapredfiles=false;
+set hive.merge.sparkfiles=false;
-- 3 mappers
explain insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/queries/clientpositive/orc_merge_incompat2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_merge_incompat2.q b/ql/src/test/queries/clientpositive/orc_merge_incompat2.q
index a8f8842..11d16c2 100644
--- a/ql/src/test/queries/clientpositive/orc_merge_incompat2.q
+++ b/ql/src/test/queries/clientpositive/orc_merge_incompat2.q
@@ -22,6 +22,7 @@ set tez.am.grouping.max-size=50000;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.optimize.sort.dynamic.partition=false;
+set hive.merge.sparkfiles=false;
explain insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
set hive.exec.orc.default.row.index.stride=1000;
http://git-wip-us.apache.org/repos/asf/hive/blob/0a88760f/ql/src/test/results/clientpositive/spark/orc_merge1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/orc_merge1.q.out b/ql/src/test/results/clientpositive/spark/orc_merge1.q.out
new file mode 100644
index 0000000..86df0a7
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/orc_merge1.q.out
@@ -0,0 +1,485 @@
+PREHOOK: query: -- SORT_QUERY_RESULTS
+
+DROP TABLE orcfile_merge1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- SORT_QUERY_RESULTS
+
+DROP TABLE orcfile_merge1
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE orcfile_merge1b
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE orcfile_merge1b
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE orcfile_merge1c
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE orcfile_merge1c
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE orcfile_merge1 (key INT, value STRING)
+ PARTITIONED BY (ds STRING, part STRING) STORED AS ORC
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@orcfile_merge1
+POSTHOOK: query: CREATE TABLE orcfile_merge1 (key INT, value STRING)
+ PARTITIONED BY (ds STRING, part STRING) STORED AS ORC
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@orcfile_merge1
+PREHOOK: query: CREATE TABLE orcfile_merge1b (key INT, value STRING)
+ PARTITIONED BY (ds STRING, part STRING) STORED AS ORC
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@orcfile_merge1b
+POSTHOOK: query: CREATE TABLE orcfile_merge1b (key INT, value STRING)
+ PARTITIONED BY (ds STRING, part STRING) STORED AS ORC
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@orcfile_merge1b
+PREHOOK: query: CREATE TABLE orcfile_merge1c (key INT, value STRING)
+ PARTITIONED BY (ds STRING, part STRING) STORED AS ORC
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@orcfile_merge1c
+POSTHOOK: query: CREATE TABLE orcfile_merge1c (key INT, value STRING)
+ PARTITIONED BY (ds STRING, part STRING) STORED AS ORC
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@orcfile_merge1c
+PREHOOK: query: -- merge disabled
+EXPLAIN
+ INSERT OVERWRITE TABLE orcfile_merge1 PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 2) as part
+ FROM src
+PREHOOK: type: QUERY
+POSTHOOK: query: -- merge disabled
+EXPLAIN
+ INSERT OVERWRITE TABLE orcfile_merge1 PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 2) as part
+ FROM src
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+ Stage-2 depends on stages: Stage-0
+
+STAGE PLANS:
+ Stage: Stage-1
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: UDFToInteger(key) (type: int), value (type: string), (hash(key) pmod 2) (type: int)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.orcfile_merge1
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ partition:
+ ds 1
+ part
+ replace: true
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.orcfile_merge1
+
+ Stage: Stage-2
+ Stats-Aggr Operator
+
+PREHOOK: query: INSERT OVERWRITE TABLE orcfile_merge1 PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 2) as part
+ FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orcfile_merge1@ds=1
+POSTHOOK: query: INSERT OVERWRITE TABLE orcfile_merge1 PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 2) as part
+ FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orcfile_merge1@ds=1/part=0
+POSTHOOK: Output: default@orcfile_merge1@ds=1/part=1
+POSTHOOK: Lineage: orcfile_merge1 PARTITION(ds=1,part=0).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1 PARTITION(ds=1,part=0).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1 PARTITION(ds=1,part=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1 PARTITION(ds=1,part=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+Found 2 items
+#### A masked pattern was here ####
+PREHOOK: query: -- auto-merge slow way
+EXPLAIN
+ INSERT OVERWRITE TABLE orcfile_merge1b PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 2) as part
+ FROM src
+PREHOOK: type: QUERY
+POSTHOOK: query: -- auto-merge slow way
+EXPLAIN
+ INSERT OVERWRITE TABLE orcfile_merge1b PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 2) as part
+ FROM src
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-7 depends on stages: Stage-1 , consists of Stage-4, Stage-3, Stage-5
+ Stage-4
+ Stage-0 depends on stages: Stage-4, Stage-3, Stage-6
+ Stage-2 depends on stages: Stage-0
+ Stage-3
+ Stage-5
+ Stage-6 depends on stages: Stage-5
+
+STAGE PLANS:
+ Stage: Stage-1
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: UDFToInteger(key) (type: int), value (type: string), (hash(key) pmod 2) (type: int)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.orcfile_merge1b
+
+ Stage: Stage-7
+ Conditional Operator
+
+ Stage: Stage-4
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ partition:
+ ds 1
+ part
+ replace: true
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.orcfile_merge1b
+
+ Stage: Stage-2
+ Stats-Aggr Operator
+
+ Stage: Stage-3
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Spark Merge File Work
+ Map Operator Tree:
+ TableScan
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.orcfile_merge1b
+
+ Stage: Stage-5
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Spark Merge File Work
+ Map Operator Tree:
+ TableScan
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.orcfile_merge1b
+
+ Stage: Stage-6
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+PREHOOK: query: INSERT OVERWRITE TABLE orcfile_merge1b PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 2) as part
+ FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orcfile_merge1b@ds=1
+POSTHOOK: query: INSERT OVERWRITE TABLE orcfile_merge1b PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 2) as part
+ FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orcfile_merge1b@ds=1/part=0
+POSTHOOK: Output: default@orcfile_merge1b@ds=1/part=1
+POSTHOOK: Lineage: orcfile_merge1b PARTITION(ds=1,part=0).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1b PARTITION(ds=1,part=0).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1b PARTITION(ds=1,part=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1b PARTITION(ds=1,part=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+Found 1 items
+#### A masked pattern was here ####
+PREHOOK: query: -- auto-merge fast way
+EXPLAIN
+ INSERT OVERWRITE TABLE orcfile_merge1c PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 2) as part
+ FROM src
+PREHOOK: type: QUERY
+POSTHOOK: query: -- auto-merge fast way
+EXPLAIN
+ INSERT OVERWRITE TABLE orcfile_merge1c PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 2) as part
+ FROM src
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-7 depends on stages: Stage-1 , consists of Stage-4, Stage-3, Stage-5
+ Stage-4
+ Stage-0 depends on stages: Stage-4, Stage-3, Stage-6
+ Stage-2 depends on stages: Stage-0
+ Stage-3
+ Stage-5
+ Stage-6 depends on stages: Stage-5
+
+STAGE PLANS:
+ Stage: Stage-1
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: UDFToInteger(key) (type: int), value (type: string), (hash(key) pmod 2) (type: int)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.orcfile_merge1c
+
+ Stage: Stage-7
+ Conditional Operator
+
+ Stage: Stage-4
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ partition:
+ ds 1
+ part
+ replace: true
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.orcfile_merge1c
+
+ Stage: Stage-2
+ Stats-Aggr Operator
+
+ Stage: Stage-3
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Spark Merge File Work
+ Merge File Operator
+ Map Operator Tree:
+ ORC File Merge Operator
+ merge level: stripe
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+
+ Stage: Stage-5
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Spark Merge File Work
+ Merge File Operator
+ Map Operator Tree:
+ ORC File Merge Operator
+ merge level: stripe
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+
+ Stage: Stage-6
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+PREHOOK: query: INSERT OVERWRITE TABLE orcfile_merge1c PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 2) as part
+ FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orcfile_merge1c@ds=1
+POSTHOOK: query: INSERT OVERWRITE TABLE orcfile_merge1c PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 2) as part
+ FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orcfile_merge1c@ds=1/part=0
+POSTHOOK: Output: default@orcfile_merge1c@ds=1/part=1
+POSTHOOK: Lineage: orcfile_merge1c PARTITION(ds=1,part=0).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1c PARTITION(ds=1,part=0).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1c PARTITION(ds=1,part=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: orcfile_merge1c PARTITION(ds=1,part=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+Found 1 items
+#### A masked pattern was here ####
+PREHOOK: query: -- Verify
+SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+ FROM orcfile_merge1 WHERE ds='1'
+) t
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orcfile_merge1
+PREHOOK: Input: default@orcfile_merge1@ds=1/part=0
+PREHOOK: Input: default@orcfile_merge1@ds=1/part=1
+#### A masked pattern was here ####
+POSTHOOK: query: -- Verify
+SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+ FROM orcfile_merge1 WHERE ds='1'
+) t
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orcfile_merge1
+POSTHOOK: Input: default@orcfile_merge1@ds=1/part=0
+POSTHOOK: Input: default@orcfile_merge1@ds=1/part=1
+#### A masked pattern was here ####
+-21975308766
+PREHOOK: query: SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+ FROM orcfile_merge1b WHERE ds='1'
+) t
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orcfile_merge1b
+PREHOOK: Input: default@orcfile_merge1b@ds=1/part=0
+PREHOOK: Input: default@orcfile_merge1b@ds=1/part=1
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+ FROM orcfile_merge1b WHERE ds='1'
+) t
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orcfile_merge1b
+POSTHOOK: Input: default@orcfile_merge1b@ds=1/part=0
+POSTHOOK: Input: default@orcfile_merge1b@ds=1/part=1
+#### A masked pattern was here ####
+-21975308766
+PREHOOK: query: SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+ FROM orcfile_merge1c WHERE ds='1'
+) t
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orcfile_merge1c
+PREHOOK: Input: default@orcfile_merge1c@ds=1/part=0
+PREHOOK: Input: default@orcfile_merge1c@ds=1/part=1
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+ FROM orcfile_merge1c WHERE ds='1'
+) t
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orcfile_merge1c
+POSTHOOK: Input: default@orcfile_merge1c@ds=1/part=0
+POSTHOOK: Input: default@orcfile_merge1c@ds=1/part=1
+#### A masked pattern was here ####
+-21975308766
+PREHOOK: query: select count(*) from orcfile_merge1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orcfile_merge1
+PREHOOK: Input: default@orcfile_merge1@ds=1/part=0
+PREHOOK: Input: default@orcfile_merge1@ds=1/part=1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orcfile_merge1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orcfile_merge1
+POSTHOOK: Input: default@orcfile_merge1@ds=1/part=0
+POSTHOOK: Input: default@orcfile_merge1@ds=1/part=1
+#### A masked pattern was here ####
+500
+PREHOOK: query: select count(*) from orcfile_merge1b
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orcfile_merge1b
+PREHOOK: Input: default@orcfile_merge1b@ds=1/part=0
+PREHOOK: Input: default@orcfile_merge1b@ds=1/part=1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orcfile_merge1b
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orcfile_merge1b
+POSTHOOK: Input: default@orcfile_merge1b@ds=1/part=0
+POSTHOOK: Input: default@orcfile_merge1b@ds=1/part=1
+#### A masked pattern was here ####
+500
+PREHOOK: query: select count(*) from orcfile_merge1c
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orcfile_merge1c
+PREHOOK: Input: default@orcfile_merge1c@ds=1/part=0
+PREHOOK: Input: default@orcfile_merge1c@ds=1/part=1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orcfile_merge1c
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orcfile_merge1c
+POSTHOOK: Input: default@orcfile_merge1c@ds=1/part=0
+POSTHOOK: Input: default@orcfile_merge1c@ds=1/part=1
+#### A masked pattern was here ####
+500
+PREHOOK: query: DROP TABLE orcfile_merge1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@orcfile_merge1
+PREHOOK: Output: default@orcfile_merge1
+POSTHOOK: query: DROP TABLE orcfile_merge1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@orcfile_merge1
+POSTHOOK: Output: default@orcfile_merge1
+PREHOOK: query: DROP TABLE orcfile_merge1b
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@orcfile_merge1b
+PREHOOK: Output: default@orcfile_merge1b
+POSTHOOK: query: DROP TABLE orcfile_merge1b
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@orcfile_merge1b
+POSTHOOK: Output: default@orcfile_merge1b
+PREHOOK: query: DROP TABLE orcfile_merge1c
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@orcfile_merge1c
+PREHOOK: Output: default@orcfile_merge1c
+POSTHOOK: query: DROP TABLE orcfile_merge1c
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@orcfile_merge1c
+POSTHOOK: Output: default@orcfile_merge1c