You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/01/12 03:03:40 UTC

svn commit: r1651024 [1/3] - in /hive/trunk: ./ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ data/conf/ data/conf/spark/ hbase-handler/ hcatalog/webhcat/svr/ itests/ itests/hive-unit/ itests/hive-unit/src/...

Author: szehon
Date: Mon Jan 12 02:03:38 2015
New Revision: 1651024

URL: http://svn.apache.org/r1651024
Log:
HIVE-9257 : Merge from spark to trunk January 2015 (Szehon, reviewed by Brock and Xuefu)

Added:
    hive/trunk/data/conf/spark/
      - copied from r1650662, hive/branches/spark/data/conf/spark/
    hive/trunk/hcatalog/webhcat/svr/.gitignore
      - copied unchanged from r1650662, hive/branches/spark/hcatalog/webhcat/svr/.gitignore
    hive/trunk/itests/.gitignore
      - copied unchanged from r1650662, hive/branches/spark/itests/.gitignore
    hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java
      - copied unchanged from r1650662, hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java
    hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java
      - copied unchanged from r1650662, hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java
    hive/trunk/itests/qtest-spark/
      - copied from r1650662, hive/branches/spark/itests/qtest-spark/
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
      - copied unchanged from r1650662, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/
      - copied from r1650662, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java
      - copied unchanged from r1650662, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java
      - copied unchanged from r1650662, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
      - copied unchanged from r1650662, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java
      - copied unchanged from r1650662, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
      - copied unchanged from r1650662, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/
      - copied from r1650662, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/
      - copied from r1650662, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkBucketMapJoinContext.java
      - copied unchanged from r1650662, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkBucketMapJoinContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
      - copied unchanged from r1650662, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java
      - copied unchanged from r1650662, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
      - copied unchanged from r1650662, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java
      - copied unchanged from r1650662, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/
      - copied from r1650662, hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/
    hive/trunk/ql/src/test/queries/clientpositive/auto_join_stats.q
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/queries/clientpositive/auto_join_stats.q
    hive/trunk/ql/src/test/queries/clientpositive/auto_join_stats2.q
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/queries/clientpositive/auto_join_stats2.q
    hive/trunk/ql/src/test/queries/clientpositive/bucket_map_join_spark1.q
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark1.q
    hive/trunk/ql/src/test/queries/clientpositive/bucket_map_join_spark2.q
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark2.q
    hive/trunk/ql/src/test/queries/clientpositive/bucket_map_join_spark3.q
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark3.q
    hive/trunk/ql/src/test/queries/clientpositive/bucket_map_join_spark4.q
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark4.q
    hive/trunk/ql/src/test/queries/clientpositive/multi_insert_mixed.q
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/queries/clientpositive/multi_insert_mixed.q
    hive/trunk/ql/src/test/queries/clientpositive/multi_insert_union_src.q
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/queries/clientpositive/multi_insert_union_src.q
    hive/trunk/ql/src/test/queries/clientpositive/parallel_join0.q
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/queries/clientpositive/parallel_join0.q
    hive/trunk/ql/src/test/queries/clientpositive/parallel_join1.q
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/queries/clientpositive/parallel_join1.q
    hive/trunk/ql/src/test/queries/clientpositive/spark_test.q
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/queries/clientpositive/spark_test.q
    hive/trunk/ql/src/test/queries/clientpositive/udf_example_add.q
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/queries/clientpositive/udf_example_add.q
    hive/trunk/ql/src/test/results/clientpositive/auto_join_stats.q.out
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/results/clientpositive/auto_join_stats.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join_stats2.q.out
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/results/clientpositive/auto_join_stats2.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucket_map_join_spark1.q.out
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/results/clientpositive/bucket_map_join_spark1.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucket_map_join_spark2.q.out
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/results/clientpositive/bucket_map_join_spark2.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucket_map_join_spark3.q.out
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/results/clientpositive/bucket_map_join_spark3.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucket_map_join_spark4.q.out
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/results/clientpositive/bucket_map_join_spark4.q.out
    hive/trunk/ql/src/test/results/clientpositive/multi_insert_mixed.q.out
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/results/clientpositive/multi_insert_mixed.q.out
    hive/trunk/ql/src/test/results/clientpositive/multi_insert_union_src.q.out
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/results/clientpositive/multi_insert_union_src.q.out
    hive/trunk/ql/src/test/results/clientpositive/parallel_join0.q.out
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/results/clientpositive/parallel_join0.q.out
    hive/trunk/ql/src/test/results/clientpositive/parallel_join1.q.out
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/results/clientpositive/parallel_join1.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/
      - copied from r1650662, hive/branches/spark/ql/src/test/results/clientpositive/spark/
    hive/trunk/ql/src/test/results/clientpositive/spark_test.q.out
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/results/clientpositive/spark_test.q.out
    hive/trunk/ql/src/test/results/clientpositive/udf_example_add.q.out
      - copied unchanged from r1650662, hive/branches/spark/ql/src/test/results/clientpositive/udf_example_add.q.out
    hive/trunk/spark-client/
      - copied from r1650662, hive/branches/spark/spark-client/
Modified:
    hive/trunk/   (props changed)
    hive/trunk/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/data/conf/hive-log4j.properties
    hive/trunk/hbase-handler/pom.xml   (props changed)
    hive/trunk/itests/hive-unit/pom.xml
    hive/trunk/itests/pom.xml
    hive/trunk/itests/src/test/resources/testconfiguration.properties
    hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
    hive/trunk/pom.xml
    hive/trunk/ql/pom.xml
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketJoinProcCtx.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BucketMapJoinContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
    hive/trunk/ql/src/test/queries/clientpositive/disable_merge_for_bucketing.q
    hive/trunk/ql/src/test/queries/clientpositive/merge1.q
    hive/trunk/ql/src/test/queries/clientpositive/merge2.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoin_union_remove_1.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoin_union_remove_2.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_1.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_10.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_11.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_12.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_13.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_14.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_15.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_16.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_17.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_18.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_19.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_2.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_20.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_21.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_22.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_23.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_24.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_25.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_3.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_4.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_5.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_6.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_7.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_8.q
    hive/trunk/ql/src/test/queries/clientpositive/union_remove_9.q
    hive/trunk/ql/src/test/results/clientpositive/spark/alter_merge_orc.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/alter_merge_stats_orc.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/vector_cast_constant.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/vectorized_ptf.q.out
    hive/trunk/serde/pom.xml
    hive/trunk/service/src/java/org/apache/hive/service/server/HiveServer2.java
    hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
    hive/trunk/shims/0.23/pom.xml
    hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
    hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java

Propchange: hive/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 12 02:03:38 2015
@@ -1,4 +1,5 @@
 /hive/branches/branch-0.11:1480385,1480458,1481120,1481344,1481346,1481348,1481352,1483872,1505184
 /hive/branches/cbo:1605012-1627125
+/hive/branches/spark:1608589-1650662
 /hive/branches/tez:1494760-1622766
 /hive/branches/vectorization:1466908-1527856

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java Mon Jan 12 02:03:38 2015
@@ -55,6 +55,8 @@ public class StatsSetupConst {
       public String getAggregator(Configuration conf) {
         if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
           return "org.apache.hadoop.hive.ql.stats.CounterStatsAggregatorTez";
+        } else if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+          return "org.apache.hadoop.hive.ql.stats.CounterStatsAggregatorSpark";
         }
         return "org.apache.hadoop.hive.ql.stats.CounterStatsAggregator"; }
     },

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Jan 12 02:03:38 2015
@@ -79,6 +79,15 @@ public class HiveConf extends Configurat
   private final List<String> restrictList = new ArrayList<String>();
 
   private Pattern modWhiteListPattern = null;
+  private boolean isSparkConfigUpdated = false;
+
+  public boolean getSparkConfigUpdated() {
+    return isSparkConfigUpdated;
+  }
+
+  public void setSparkConfigUpdated(boolean isSparkConfigUpdated) {
+    this.isSparkConfigUpdated = isSparkConfigUpdated;
+  }
 
   static {
     ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
@@ -842,6 +851,7 @@ public class HiveConf extends Configurat
     HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false,
         "Merge small files at the end of a map-reduce job"),
     HIVEMERGETEZFILES("hive.merge.tezfiles", false, "Merge small files at the end of a Tez DAG"),
+    HIVEMERGESPARKFILES("hive.merge.sparkfiles", false, "Merge small files at the end of a Spark DAG Transformation"),
     HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long) (256 * 1000 * 1000),
         "Size of merged files at the end of the job"),
     HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long) (16 * 1000 * 1000),
@@ -1855,8 +1865,8 @@ public class HiveConf extends Configurat
     HIVE_DECODE_PARTITION_NAME("hive.decode.partition.name", false,
         "Whether to show the unquoted partition names in query results."),
 
-    HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr", new StringSet("mr", "tez"),
-        "Chooses execution engine. Options are: mr (Map reduce, default) or tez (hadoop 2 only)"),
+    HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr", new StringSet("mr", "tez", "spark"),
+        "Chooses execution engine. Options are: mr (Map reduce, default), tez (hadoop 2 only), spark"),
     HIVE_JAR_DIRECTORY("hive.jar.directory", null,
         "This is the location hive in tez mode will look for to find a site wide \n" +
         "installed hive instance."),
@@ -1971,7 +1981,12 @@ public class HiveConf extends Configurat
     TEZ_EXEC_INPLACE_PROGRESS(
         "hive.tez.exec.inplace.progress",
         true,
-        "Updates tez job execution progress in-place in the terminal.")
+        "Updates tez job execution progress in-place in the terminal."),
+    SPARK_CLIENT_FUTURE_TIMEOUT(
+        "hive.spark.client.future.timeout",
+        "60s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "remote spark client JobHandle future timeout value in seconds.")
     ;
 
     public final String varname;
@@ -2212,6 +2227,7 @@ public class HiveConf extends Configurat
       throw new IllegalArgumentException("Cannot modify " + name + " at runtime. It is in the list"
           + "of parameters that can't be modified at runtime");
     }
+    isSparkConfigUpdated = name.startsWith("spark");
     set(name, value);
   }
 

Modified: hive/trunk/data/conf/hive-log4j.properties
URL: http://svn.apache.org/viewvc/hive/trunk/data/conf/hive-log4j.properties?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/data/conf/hive-log4j.properties (original)
+++ hive/trunk/data/conf/hive-log4j.properties Mon Jan 12 02:03:38 2015
@@ -42,7 +42,7 @@ log4j.appender.DRFA.layout=org.apache.lo
 # Pattern format: Date LogLevel LoggerName LogMessage
 #log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
 # Debugging Pattern format
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t]: %c{2} (%F:%M(%L)) - %m%n
 
 # Warning: If you enable the following appender it will fill up your disk if you don't have a cleanup job!
 # This uses the updated rolling file appender from log4j-extras that supports a reliable time-based rolling policy.

Propchange: hive/trunk/hbase-handler/pom.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 12 02:03:38 2015
@@ -1,6 +1,6 @@
 /hive/branches/branch-0.11/hbase-handler/pom.xml:1480385,1480458,1481120,1481344,1481346,1481348,1481352,1483872,1505184
 /hive/branches/cbo/hbase-handler/pom.xml:1605012-1627125
-/hive/branches/spark/hbase-handler/pom.xml:1608589-1621357
+/hive/branches/spark/hbase-handler/pom.xml:1608589-1650662
 /hive/branches/tez/hbase-handler/pom.xml:1494760-1622766
 /hive/branches/vectorization/hbase-handler/pom.xml:1466908-1527856
 /hive/trunk/hbase-handler/pom.xml:1494760-1537575

Modified: hive/trunk/itests/hive-unit/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/pom.xml?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/pom.xml (original)
+++ hive/trunk/itests/hive-unit/pom.xml Mon Jan 12 02:03:38 2015
@@ -29,6 +29,7 @@
 
   <properties>
     <hive.path.to.root>../..</hive.path.to.root>
+    <spark.home>${basedir}/${hive.path.to.root}/itests/hive-unit/target/spark</spark.home>
   </properties>
 
   <dependencies>
@@ -110,6 +111,12 @@
     </dependency>
     <!-- test inter-project -->
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>${junit.version}</version>
@@ -258,6 +265,20 @@
         <artifactId>maven-antrun-plugin</artifactId>
         <executions>
           <execution>
+            <id>download-spark</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <target>
+                <exec executable="bash" dir="${basedir}" failonerror="true">
+                  <arg line="../target/download.sh"/>
+                </exec>
+              </target>
+            </configuration>
+          </execution>
+          <execution>
             <id>setup-metastore-scripts</id>
             <phase>process-test-resources</phase>
             <goals>

Modified: hive/trunk/itests/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/pom.xml?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/itests/pom.xml (original)
+++ hive/trunk/itests/pom.xml Mon Jan 12 02:03:38 2015
@@ -44,10 +44,60 @@
     <profile>
       <id>hadoop-2</id>
       <modules>
+        <module>qtest-spark</module>
         <module>hive-unit-hadoop2</module>
         <module>hive-minikdc</module>
       </modules>
     </profile>
+    <profile>
+      <id>hadoop-1</id>
+      <modules>
+        <module>qtest-spark</module>
+      </modules>
+    </profile>
   </profiles>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>download-spark</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <target>
+                <echo file="target/download.sh">
+                  set -x
+                  /bin/pwd
+                  BASE_DIR=./target
+                  HIVE_ROOT=$BASE_DIR/../../../
+                  DOWNLOAD_DIR=./../thirdparty
+                  download() {
+                    url=$1;
+                    finalName=$2
+                    tarName=$(basename $url)
+                    rm -rf $BASE_DIR/$finalName
+                    if [[ ! -f $DOWNLOAD_DIR/$tarName ]]
+                    then
+                     curl -Sso $DOWNLOAD_DIR/$tarName $url
+                    fi
+                    tar -zxf $DOWNLOAD_DIR/$tarName -C $BASE_DIR
+                    mv $BASE_DIR/${finalName}* $BASE_DIR/$finalName
+                  }
+                  mkdir -p $DOWNLOAD_DIR
+                  download "http://ec2-50-18-79-139.us-west-1.compute.amazonaws.com/data/spark-1.2.1-SNAPSHOT-bin-hadoop2-without-hive.tgz" "spark"
+                  cp -f $HIVE_ROOT/data/conf/spark/log4j.properties $BASE_DIR/spark/conf/
+                </echo>
+              </target>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
 </project>

Modified: hive/trunk/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/trunk/itests/src/test/resources/testconfiguration.properties?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/trunk/itests/src/test/resources/testconfiguration.properties Mon Jan 12 02:03:38 2015
@@ -458,3 +458,526 @@ minimr.query.negative.files=cluster_task
   mapreduce_stack_trace_turnoff_hadoop20.q,\
   minimr_broken_pipe.q,\
   udf_local_resource.q
+
+# tests are sorted use: perl -pe 's@\\\s*\n@ @g' testconfiguration.properties \
+# | awk -F= '/spark.query.files/{print $2}' | perl -pe 's@.q *, *@\n@g' \
+# | egrep -v '^ *$' |  sort -V | uniq | perl -pe 's@\n@.q, \\\n@g' | perl -pe 's@^@  @g'
+spark.query.files=add_part_multiple.q, \
+  alter_merge_orc.q, \
+  alter_merge_stats_orc.q, \
+  annotate_stats_join.q, \
+  auto_join0.q, \
+  auto_join1.q, \
+  auto_join10.q, \
+  auto_join11.q, \
+  auto_join12.q, \
+  auto_join13.q, \
+  auto_join14.q, \
+  auto_join15.q, \
+  auto_join16.q, \
+  auto_join17.q, \
+  auto_join18.q, \
+  auto_join18_multi_distinct.q, \
+  auto_join19.q, \
+  auto_join2.q, \
+  auto_join20.q, \
+  auto_join21.q, \
+  auto_join22.q, \
+  auto_join23.q, \
+  auto_join24.q, \
+  auto_join26.q, \
+  auto_join27.q, \
+  auto_join28.q, \
+  auto_join29.q, \
+  auto_join3.q, \
+  auto_join30.q, \
+  auto_join31.q, \
+  auto_join32.q, \
+  auto_join4.q, \
+  auto_join5.q, \
+  auto_join6.q, \
+  auto_join7.q, \
+  auto_join8.q, \
+  auto_join9.q, \
+  auto_join_filters.q, \
+  auto_join_nulls.q, \
+  auto_join_reordering_values.q, \
+  auto_join_stats.q, \
+  auto_join_stats2.q, \
+  auto_join_without_localtask.q, \
+  auto_smb_mapjoin_14.q, \
+  auto_sortmerge_join_1.q, \
+  auto_sortmerge_join_10.q, \
+  auto_sortmerge_join_12.q, \
+  auto_sortmerge_join_13.q, \
+  auto_sortmerge_join_14.q, \
+  auto_sortmerge_join_15.q, \
+  auto_sortmerge_join_16.q, \
+  auto_sortmerge_join_2.q, \
+  auto_sortmerge_join_3.q, \
+  auto_sortmerge_join_4.q, \
+  auto_sortmerge_join_5.q, \
+  auto_sortmerge_join_6.q, \
+  auto_sortmerge_join_7.q, \
+  auto_sortmerge_join_8.q, \
+  auto_sortmerge_join_9.q, \
+  avro_compression_enabled_native.q, \
+  avro_decimal_native.q, \
+  avro_joins.q, \
+  avro_joins_native.q, \
+  bucket2.q, \
+  bucket3.q, \
+  bucket4.q, \
+  bucket_map_join_1.q, \
+  bucket_map_join_2.q, \
+  bucket_map_join_spark1.q, \
+  bucket_map_join_spark2.q, \
+  bucket_map_join_spark3.q, \
+  bucket_map_join_spark4.q, \
+  bucket_map_join_tez1.q, \
+  bucket_map_join_tez2.q, \
+  bucketmapjoin1.q, \
+  bucketmapjoin10.q, \
+  bucketmapjoin11.q, \
+  bucketmapjoin12.q, \
+  bucketmapjoin13.q, \
+  bucketmapjoin2.q, \
+  bucketmapjoin3.q, \
+  bucketmapjoin4.q, \
+  bucketmapjoin5.q, \
+  bucketmapjoin7.q, \
+  bucketmapjoin8.q, \
+  bucketmapjoin9.q, \
+  bucketmapjoin_negative.q, \
+  bucketmapjoin_negative2.q, \
+  bucketmapjoin_negative3.q, \
+  bucketsortoptimize_insert_2.q, \
+  bucketsortoptimize_insert_4.q, \
+  bucketsortoptimize_insert_6.q, \
+  bucketsortoptimize_insert_7.q, \
+  bucketsortoptimize_insert_8.q, \
+  column_access_stats.q, \
+  count.q, \
+  create_merge_compressed.q, \
+  cross_join.q, \
+  cross_product_check_1.q, \
+  cross_product_check_2.q, \
+  ctas.q, \
+  custom_input_output_format.q, \
+  date_join1.q, \
+  date_udf.q, \
+  decimal_join.q, \
+  disable_merge_for_bucketing.q, \
+  enforce_order.q, \
+  escape_clusterby1.q, \
+  escape_distributeby1.q, \
+  escape_orderby1.q, \
+  escape_sortby1.q, \
+  filter_join_breaktask.q, \
+  filter_join_breaktask2.q, \
+  groupby1.q, \
+  groupby10.q, \
+  groupby11.q, \
+  groupby2.q, \
+  groupby3.q, \
+  groupby3_map.q, \
+  groupby3_map_multi_distinct.q, \
+  groupby3_map_skew.q, \
+  groupby3_noskew.q, \
+  groupby3_noskew_multi_distinct.q, \
+  groupby4.q, \
+  groupby7.q, \
+  groupby7_map.q, \
+  groupby7_map_multi_single_reducer.q, \
+  groupby7_map_skew.q, \
+  groupby7_noskew.q, \
+  groupby7_noskew_multi_single_reducer.q, \
+  groupby8.q, \
+  groupby8_map.q, \
+  groupby8_map_skew.q, \
+  groupby8_noskew.q, \
+  groupby9.q, \
+  groupby_bigdata.q, \
+  groupby_complex_types.q, \
+  groupby_complex_types_multi_single_reducer.q, \
+  groupby_cube1.q, \
+  groupby_map_ppr.q, \
+  groupby_map_ppr_multi_distinct.q, \
+  groupby_multi_insert_common_distinct.q, \
+  groupby_multi_single_reducer.q, \
+  groupby_multi_single_reducer2.q, \
+  groupby_multi_single_reducer3.q, \
+  groupby_position.q, \
+  groupby_ppr.q, \
+  groupby_rollup1.q, \
+  groupby_sort_1_23.q, \
+  groupby_sort_skew_1_23.q, \
+  having.q, \
+  identity_project_remove_skip.q, \
+  index_auto_self_join.q, \
+  innerjoin.q, \
+  input12.q, \
+  input13.q, \
+  input14.q, \
+  input17.q, \
+  input18.q, \
+  input1_limit.q, \
+  input_part2.q, \
+  insert1.q, \
+  insert_into1.q, \
+  insert_into2.q, \
+  insert_into3.q, \
+  join0.q, \
+  join1.q, \
+  join10.q, \
+  join11.q, \
+  join12.q, \
+  join13.q, \
+  join14.q, \
+  join15.q, \
+  join16.q, \
+  join17.q, \
+  join18.q, \
+  join18_multi_distinct.q, \
+  join19.q, \
+  join2.q, \
+  join20.q, \
+  join21.q, \
+  join22.q, \
+  join23.q, \
+  join24.q, \
+  join25.q, \
+  join26.q, \
+  join27.q, \
+  join28.q, \
+  join29.q, \
+  join3.q, \
+  join30.q, \
+  join31.q, \
+  join32.q, \
+  join32_lessSize.q, \
+  join33.q, \
+  join34.q, \
+  join35.q, \
+  join36.q, \
+  join37.q, \
+  join39.q, \
+  join4.q, \
+  join40.q, \
+  join41.q, \
+  join5.q, \
+  join6.q, \
+  join7.q, \
+  join8.q, \
+  join9.q, \
+  join_1to1.q, \
+  join_alt_syntax.q, \
+  join_array.q, \
+  join_casesensitive.q, \
+  join_cond_pushdown_1.q, \
+  join_cond_pushdown_2.q, \
+  join_cond_pushdown_3.q, \
+  join_cond_pushdown_4.q, \
+  join_cond_pushdown_unqual1.q, \
+  join_cond_pushdown_unqual2.q, \
+  join_cond_pushdown_unqual3.q, \
+  join_cond_pushdown_unqual4.q, \
+  join_empty.q, \
+  join_filters_overlap.q, \
+  join_hive_626.q, \
+  join_map_ppr.q, \
+  join_merge_multi_expressions.q, \
+  join_merging.q, \
+  join_rc.q, \
+  join_reorder.q, \
+  join_reorder2.q, \
+  join_reorder3.q, \
+  join_reorder4.q, \
+  join_star.q, \
+  join_thrift.q, \
+  join_vc.q, \
+  join_view.q, \
+  leftsemijoin.q, \
+  leftsemijoin_mr.q, \
+  limit_partition_metadataonly.q, \
+  limit_pushdown.q, \
+  list_bucket_dml_2.q, \
+  load_dyn_part1.q, \
+  load_dyn_part10.q, \
+  load_dyn_part11.q, \
+  load_dyn_part12.q, \
+  load_dyn_part13.q, \
+  load_dyn_part14.q, \
+  load_dyn_part15.q, \
+  load_dyn_part2.q, \
+  load_dyn_part3.q, \
+  load_dyn_part4.q, \
+  load_dyn_part5.q, \
+  load_dyn_part6.q, \
+  load_dyn_part7.q, \
+  load_dyn_part8.q, \
+  load_dyn_part9.q, \
+  louter_join_ppr.q, \
+  mapjoin1.q, \
+  mapjoin_addjar.q, \
+  mapjoin_decimal.q, \
+  mapjoin_distinct.q, \
+  mapjoin_filter_on_outerjoin.q, \
+  mapjoin_mapjoin.q, \
+  mapjoin_memcheck.q, \
+  mapjoin_subquery.q, \
+  mapjoin_subquery2.q, \
+  mapjoin_test_outer.q, \
+  mapreduce1.q, \
+  mapreduce2.q, \
+  merge1.q, \
+  merge2.q, \
+  mergejoins.q, \
+  mergejoins_mixed.q, \
+  metadata_only_queries.q, \
+  metadata_only_queries_with_filters.q, \
+  multi_insert.q, \
+  multi_insert_gby.q, \
+  multi_insert_gby2.q, \
+  multi_insert_gby3.q, \
+  multi_insert_lateral_view.q, \
+  multi_insert_mixed.q, \
+  multi_insert_move_tasks_share_dependencies.q, \
+  multi_join_union.q, \
+  multi_join_union_src.q, \
+  multigroupby_singlemr.q, \
+  optimize_nullscan.q, \
+  order.q, \
+  order2.q, \
+  outer_join_ppr.q, \
+  parallel.q, \
+  parallel_join0.q, \
+  parallel_join1.q, \
+  parquet_join.q, \
+  pcr.q, \
+  ppd_gby_join.q, \
+  ppd_join.q, \
+  ppd_join2.q, \
+  ppd_join3.q, \
+  ppd_join4.q, \
+  ppd_join5.q, \
+  ppd_join_filter.q, \
+  ppd_outer_join1.q, \
+  ppd_outer_join2.q, \
+  ppd_outer_join3.q, \
+  ppd_outer_join4.q, \
+  ppd_outer_join5.q, \
+  ppd_transform.q, \
+  ptf.q, \
+  ptf_decimal.q, \
+  ptf_general_queries.q, \
+  ptf_matchpath.q, \
+  ptf_rcfile.q, \
+  ptf_register_tblfn.q, \
+  ptf_seqfile.q, \
+  ptf_streaming.q, \
+  rcfile_bigdata.q, \
+  reduce_deduplicate_exclude_join.q, \
+  router_join_ppr.q, \
+  sample1.q, \
+  sample10.q, \
+  sample2.q, \
+  sample3.q, \
+  sample4.q, \
+  sample5.q, \
+  sample6.q, \
+  sample7.q, \
+  sample8.q, \
+  sample9.q, \
+  script_env_var1.q, \
+  script_env_var2.q, \
+  script_pipe.q, \
+  scriptfile1.q, \
+  semijoin.q, \
+  skewjoin.q, \
+  skewjoin_noskew.q, \
+  skewjoin_union_remove_1.q, \
+  skewjoin_union_remove_2.q, \
+  skewjoinopt1.q, \
+  skewjoinopt10.q, \
+  skewjoinopt11.q, \
+  skewjoinopt12.q, \
+  skewjoinopt13.q, \
+  skewjoinopt14.q, \
+  skewjoinopt15.q, \
+  skewjoinopt16.q, \
+  skewjoinopt17.q, \
+  skewjoinopt18.q, \
+  skewjoinopt19.q, \
+  skewjoinopt2.q, \
+  skewjoinopt20.q, \
+  skewjoinopt3.q, \
+  skewjoinopt4.q, \
+  skewjoinopt5.q, \
+  skewjoinopt6.q, \
+  skewjoinopt7.q, \
+  skewjoinopt8.q, \
+  skewjoinopt9.q, \
+  smb_mapjoin_1.q, \
+  smb_mapjoin_10.q, \
+  smb_mapjoin_11.q, \
+  smb_mapjoin_12.q, \
+  smb_mapjoin_13.q, \
+  smb_mapjoin_14.q, \
+  smb_mapjoin_15.q, \
+  smb_mapjoin_16.q, \
+  smb_mapjoin_17.q, \
+  smb_mapjoin_18.q, \
+  smb_mapjoin_19.q, \
+  smb_mapjoin_2.q, \
+  smb_mapjoin_20.q, \
+  smb_mapjoin_21.q, \
+  smb_mapjoin_22.q, \
+  smb_mapjoin_25.q, \
+  smb_mapjoin_3.q, \
+  smb_mapjoin_4.q, \
+  smb_mapjoin_5.q, \
+  smb_mapjoin_6.q, \
+  smb_mapjoin_7.q, \
+  smb_mapjoin_8.q, \
+  smb_mapjoin_9.q, \
+  sort.q, \
+  spark_test.q, \
+  stats0.q, \
+  stats1.q, \
+  stats10.q, \
+  stats12.q, \
+  stats13.q, \
+  stats14.q, \
+  stats15.q, \
+  stats16.q, \
+  stats18.q, \
+  stats2.q, \
+  stats20.q, \
+  stats3.q, \
+  stats5.q, \
+  stats6.q, \
+  stats7.q, \
+  stats8.q, \
+  stats9.q, \
+  stats_counter.q, \
+  stats_counter_partitioned.q, \
+  stats_noscan_1.q, \
+  stats_noscan_2.q, \
+  stats_only_null.q, \
+  stats_partscan_1_23.q, \
+  statsfs.q, \
+  subquery_exists.q, \
+  subquery_multiinsert.q, \
+  table_access_keys_stats.q, \
+  temp_table.q, \
+  temp_table_join1.q, \
+  tez_join_tests.q, \
+  tez_joins_explain.q, \
+  timestamp_1.q, \
+  timestamp_2.q, \
+  timestamp_3.q, \
+  timestamp_comparison.q, \
+  timestamp_lazy.q, \
+  timestamp_null.q, \
+  timestamp_udf.q, \
+  transform1.q, \
+  transform2.q, \
+  transform_ppr1.q, \
+  transform_ppr2.q, \
+  udf_example_add.q, \
+  union.q, \
+  union10.q, \
+  union11.q, \
+  union13.q, \
+  union14.q, \
+  union15.q, \
+  union16.q, \
+  union18.q, \
+  union19.q, \
+  union2.q, \
+  union23.q, \
+  union25.q, \
+  union28.q, \
+  union29.q, \
+  union3.q, \
+  union30.q, \
+  union33.q, \
+  union4.q, \
+  union5.q, \
+  union6.q, \
+  union7.q, \
+  union8.q, \
+  union9.q, \
+  union_null.q, \
+  union_ppr.q, \
+  union_remove_1.q, \
+  union_remove_10.q, \
+  union_remove_11.q, \
+  union_remove_15.q, \
+  union_remove_16.q, \
+  union_remove_17.q, \
+  union_remove_18.q, \
+  union_remove_19.q, \
+  union_remove_2.q, \
+  union_remove_20.q, \
+  union_remove_21.q, \
+  union_remove_24.q, \
+  union_remove_25.q, \
+  union_remove_3.q, \
+  union_remove_4.q, \
+  union_remove_5.q, \
+  union_remove_6.q, \
+  union_remove_7.q, \
+  union_remove_8.q, \
+  union_remove_9.q, \
+  uniquejoin.q, \
+  varchar_join1.q, \
+  vector_between_in.q, \
+  vector_cast_constant.q, \
+  vector_char_4.q, \
+  vector_count_distinct.q, \
+  vector_data_types.q, \
+  vector_decimal_aggregate.q, \
+  vector_decimal_mapjoin.q, \
+  vector_distinct_2.q, \
+  vector_elt.q, \
+  vector_groupby_3.q, \
+  vector_left_outer_join.q, \
+  vector_mapjoin_reduce.q, \
+  vector_orderby_5.q, \
+  vector_string_concat.q, \
+  vector_varchar_4.q, \
+  vectorization_0.q, \
+  vectorization_1.q, \
+  vectorization_10.q, \
+  vectorization_11.q, \
+  vectorization_12.q, \
+  vectorization_13.q, \
+  vectorization_14.q, \
+  vectorization_15.q, \
+  vectorization_16.q, \
+  vectorization_2.q, \
+  vectorization_3.q, \
+  vectorization_4.q, \
+  vectorization_5.q, \
+  vectorization_6.q, \
+  vectorization_9.q, \
+  vectorization_decimal_date.q, \
+  vectorization_div0.q, \
+  vectorization_nested_udf.q, \
+  vectorization_not.q, \
+  vectorization_part.q, \
+  vectorization_part_project.q, \
+  vectorization_pushdown.q, \
+  vectorization_short_regress.q, \
+  vectorized_case.q, \
+  vectorized_mapjoin.q, \
+  vectorized_math_funcs.q, \
+  vectorized_nested_mapjoin.q, \
+  vectorized_ptf.q, \
+  vectorized_rcfile_columnar.q, \
+  vectorized_shufflejoin.q, \
+  vectorized_string_funcs.q, \
+  vectorized_timestamp_funcs.q, \
+  windowing.q

Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java Mon Jan 12 02:03:38 2015
@@ -281,6 +281,7 @@ public class QTestUtil {
   public enum MiniClusterType {
     mr,
     tez,
+    spark,
     none;
 
     public static MiniClusterType valueForString(String type) {
@@ -288,6 +289,8 @@ public class QTestUtil {
         return mr;
       } else if (type.equals("tez")) {
         return tez;
+      } else if (type.equals("spark")) {
+        return spark;
       } else {
         return none;
       }
@@ -323,7 +326,7 @@ public class QTestUtil {
     HadoopShims shims = ShimLoader.getHadoopShims();
     int numberOfDataNodes = 4;
 
-    if (clusterType != MiniClusterType.none) {
+    if (clusterType != MiniClusterType.none && clusterType != MiniClusterType.spark) {
       dfs = shims.getMiniDfs(conf, numberOfDataNodes, true, null);
       FileSystem fs = dfs.getFileSystem();
       String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString());
@@ -794,7 +797,7 @@ public class QTestUtil {
     ss.setIsSilent(true);
     SessionState oldSs = SessionState.get();
 
-    if (oldSs != null && clusterType == MiniClusterType.tez) {
+    if (oldSs != null && (clusterType == MiniClusterType.tez || clusterType == MiniClusterType.spark)) {
       oldSs.close();
     }
 
@@ -828,7 +831,7 @@ public class QTestUtil {
     ss.err = System.out;
 
     SessionState oldSs = SessionState.get();
-    if (oldSs != null && clusterType == MiniClusterType.tez) {
+    if (oldSs != null && (clusterType == MiniClusterType.tez || clusterType == MiniClusterType.spark)) {
       oldSs.close();
     }
     if (oldSs != null && oldSs.out != null && oldSs.out != System.out) {

Modified: hive/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/pom.xml (original)
+++ hive/trunk/pom.xml Mon Jan 12 02:03:38 2015
@@ -47,6 +47,7 @@
     <module>serde</module>
     <module>service</module>
     <module>shims</module>
+    <module>spark-client</module>
     <module>testutils</module>
     <module>packaging</module>
   </modules>
@@ -115,7 +116,7 @@
     <commons-pool.version>1.5.4</commons-pool.version>
     <commons-dbcp.version>1.4</commons-dbcp.version>
     <derby.version>10.11.1.1</derby.version>
-    <guava.version>11.0.2</guava.version>
+    <guava.version>14.0.1</guava.version>
     <groovy.version>2.1.6</groovy.version>
     <hadoop-20S.version>1.2.1</hadoop-20S.version>
     <hadoop-23.version>2.5.0</hadoop-23.version>
@@ -144,11 +145,7 @@
     <opencsv.version>2.3</opencsv.version>
     <mockito-all.version>1.9.5</mockito-all.version>
     <mina.version>2.0.0-M5</mina.version>
-    <!--netty is not a direct dependency but due to a change
-        in artifact name and given that zookeeper < 3.5
-        requires netty < 3.6.0 we force hadoops version
-      -->
-    <netty.version>3.4.0.Final</netty.version>
+    <netty.version>4.0.23.Final</netty.version>
     <parquet.version>1.6.0rc3</parquet.version>
     <pig.version>0.12.0</pig.version>
     <protobuf.version>2.5.0</protobuf.version>
@@ -157,15 +154,19 @@
     <ST4.version>4.0.4</ST4.version>
     <tez.version>0.5.2</tez.version>
     <super-csv.version>2.2.0</super-csv.version>
+    <spark.version>1.2.1-SNAPSHOT</spark.version>
+    <scala.binary.version>2.10</scala.binary.version>
+    <scala.version>2.10.4</scala.version>
     <tempus-fugit.version>1.1</tempus-fugit.version>
     <snappy.version>0.2</snappy.version>
     <wadl-resourcedoc-doclet.version>1.4</wadl-resourcedoc-doclet.version>
     <velocity.version>1.5</velocity.version>
     <xerces.version>2.9.1</xerces.version>
-    <zookeeper.version>3.4.5</zookeeper.version>
+    <zookeeper.version>3.4.6</zookeeper.version>
     <jpam.version>1.1</jpam.version>
     <felix.version>2.4.0</felix.version>
     <curator.version>2.6.0</curator.version>
+    <jsr305.version>3.0.0</jsr305.version>
   </properties>
 
   <repositories>
@@ -201,7 +202,7 @@
       <snapshots>
         <enabled>false</enabled>
       </snapshots>
-    </repository>
+     </repository>
      <repository>
        <id>sonatype-snapshot</id>
        <url>https://oss.sonatype.org/content/repositories/snapshots</url>
@@ -212,6 +213,16 @@
          <enabled>false</enabled>
        </snapshots>
      </repository>
+     <repository>
+       <id>spark-snapshot</id>
+       <url>http://ec2-50-18-79-139.us-west-1.compute.amazonaws.com/data/spark_2.10-1.2-SNAPSHOT/</url>
+       <releases>
+         <enabled>false</enabled>
+       </releases>
+       <snapshots>
+         <enabled>true</enabled>
+       </snapshots>
+     </repository>
   </repositories>
 
   <!-- Hadoop dependency management is done at the bottom under profiles -->
@@ -316,7 +327,7 @@
       </dependency>
       <dependency>
         <groupId>io.netty</groupId>
-        <artifactId>netty</artifactId>
+        <artifactId>netty-all</artifactId>
         <version>${netty.version}</version>
       </dependency>
       <dependency>
@@ -843,6 +854,7 @@
             <log4j.debug>true</log4j.debug>
             <!-- don't diry up /tmp -->
             <java.io.tmpdir>${test.tmp.dir}</java.io.tmpdir>
+            <spark.home>${spark.home}</spark.home>
             <!-- Hadoop's minidfs class uses this -->
             <test.build.data>${test.tmp.dir}</test.build.data>
             <!-- required by QTestUtil -->
@@ -1005,26 +1017,6 @@
         </plugins>
       </build>
     </profile>
-
-    <profile>
-    <id>findbugs</id>
-      <reporting>
-        <plugins>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>findbugs-maven-plugin</artifactId>
-            <version>3.0.0</version>
-            <configuration>
-              <fork>true</fork>
-              <maxHeap>2048</maxHeap>
-              <jvmArgs>-Djava.awt.headless=true -Xmx2048m -Xms512m</jvmArgs>
-              <excludeFilterFile>${project.parent.basedir}/findbugs/findbugs-exclude.xml</excludeFilterFile>
-            </configuration>
-          </plugin>
-        </plugins>
-      </reporting>
-    </profile>
-
     <!-- hadoop profiles in the root pom are only used for dependency management -->
     <profile>
       <id>hadoop-1</id>
@@ -1132,6 +1124,22 @@
             <artifactId>hadoop-minicluster</artifactId>
             <version>${hadoop-23.version}</version>
           </dependency>
+          <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-core</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
         </dependencies>
       </dependencyManagement>
     </profile>

Modified: hive/trunk/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/ql/pom.xml?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/pom.xml (original)
+++ hive/trunk/ql/pom.xml Mon Jan 12 02:03:38 2015
@@ -60,6 +60,11 @@
       <artifactId>hive-shims</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>spark-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <!-- inter-project -->
     <dependency>
       <groupId>com.esotericsoftware.kryo</groupId>
@@ -135,6 +140,12 @@
       <artifactId>avro-mapred</artifactId>
       <classifier>hadoop2</classifier>
       <version>${avro.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.ant</groupId>
@@ -312,6 +323,10 @@
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-yarn-client</artifactId>
        </exclusion>
+       <exclusion>
+         <groupId>javax.servlet</groupId>
+         <artifactId>servlet-api</artifactId>
+       </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -410,6 +425,12 @@
        </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <optional>true</optional>
+    </dependency>
   </dependencies>
 
   <profiles>
@@ -650,6 +671,7 @@
                   <include>org.codehaus.jackson:jackson-mapper-asl</include>
                   <include>com.google.guava:guava</include>
                   <include>net.sf.opencsv:opencsv</include>
+                  <include>org.apache.hive:spark-client</include>
                 </includes>
               </artifactSet>
               <relocations>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java Mon Jan 12 02:03:38 2015
@@ -539,6 +539,13 @@ public class Context {
    * Today this translates into running hadoop jobs locally
    */
   public boolean isLocalOnlyExecutionMode() {
+    // Always allow spark to run in a cluster mode. Without this, depending on
+    // user's local hadoop settings, true may be returned, which causes plan to be
+    // stored in local path.
+    if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+      return false;
+    }
+
     return ShimLoader.getHadoopShims().isLocalMode(conf);
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Mon Jan 12 02:03:38 2015
@@ -1343,7 +1343,8 @@ public class Driver implements CommandPr
       }
 
       int jobs = Utilities.getMRTasks(plan.getRootTasks()).size()
-        + Utilities.getTezTasks(plan.getRootTasks()).size();
+        + Utilities.getTezTasks(plan.getRootTasks()).size()
+        + Utilities.getSparkTasks(plan.getRootTasks()).size();
       if (jobs > 0) {
         console.printInfo("Query ID = " + plan.getQueryId());
         console.printInfo("Total jobs = " + jobs);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java Mon Jan 12 02:03:38 2015
@@ -34,6 +34,8 @@ public class HashTableLoaderFactory {
   public static HashTableLoader getLoader(Configuration hconf) {
     if (HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
       return new org.apache.hadoop.hive.ql.exec.tez.HashTableLoader();
+    } else if (HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+      return new org.apache.hadoop.hive.ql.exec.spark.HashTableLoader();
     } else {
       return new org.apache.hadoop.hive.ql.exec.mr.HashTableLoader();
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Mon Jan 12 02:03:38 2015
@@ -48,11 +48,7 @@ import org.apache.hadoop.hive.ql.hooks.R
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.plan.Explain;
-import org.apache.hadoop.hive.ql.plan.ExplainWork;
-import org.apache.hadoop.hive.ql.plan.HiveOperation;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.*;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.security.authorization.AuthorizationFactory;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -428,6 +424,36 @@ public class ExplainTask extends Task<Ex
               json.accumulate(ent.getKey().toString(), jsonDep);
             }
           }
+        } else if (ent.getValue() != null && !((List<?>) ent.getValue()).isEmpty()
+            && ((List<?>) ent.getValue()).get(0) != null &&
+            ((List<?>) ent.getValue()).get(0) instanceof SparkWork.Dependency) {
+          if (out != null) {
+            boolean isFirst = true;
+            for (SparkWork.Dependency dep: (List<SparkWork.Dependency>) ent.getValue()) {
+              if (!isFirst) {
+                out.print(", ");
+              } else {
+                out.print("<- ");
+                isFirst = false;
+              }
+              out.print(dep.getName());
+              out.print(" (");
+              out.print(dep.getShuffleType());
+              out.print(", ");
+              out.print(dep.getNumPartitions());
+              out.print(")");
+            }
+            out.println();
+          }
+          if (jsonOutput) {
+            for (SparkWork.Dependency dep: (List<SparkWork.Dependency>) ent.getValue()) {
+              JSONObject jsonDep = new JSONObject();
+              jsonDep.put("parent", dep.getName());
+              jsonDep.put("type", dep.getShuffleType());
+              jsonDep.put("partitions", dep.getNumPartitions());
+              json.accumulate(ent.getKey().toString(), jsonDep);
+            }
+          }
         } else {
           if (out != null) {
             out.print(ent.getValue().toString());

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Mon Jan 12 02:03:38 2015
@@ -59,7 +59,7 @@ public class FilterOperator extends Oper
       }
 
       conditionInspector = null;
-      ioContext = IOContext.get(hconf.get(Utilities.INPUT_NAME));
+      ioContext = IOContext.get(hconf);
     } catch (Throwable e) {
       throw new HiveException(e);
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Mon Jan 12 02:03:38 2015
@@ -91,19 +91,16 @@ public class HashTableSinkOperator exten
   private transient List<ObjectInspector>[] joinFilterObjectInspectors;
 
   private transient Byte[] order; // order in which the results should
-  private Configuration hconf;
+  protected Configuration hconf;
 
-  private transient MapJoinPersistableTableContainer[] mapJoinTables;
-  private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
+  protected transient MapJoinPersistableTableContainer[] mapJoinTables;
+  protected transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
+
+  private final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
+  private final MapJoinEagerRowContainer EMPTY_ROW_CONTAINER = new MapJoinEagerRowContainer();
 
-  private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
-  private static final MapJoinEagerRowContainer EMPTY_ROW_CONTAINER = new MapJoinEagerRowContainer();
-  static {
-    EMPTY_ROW_CONTAINER.addRow(EMPTY_OBJECT_ARRAY);
-  }
-  
   private long rowNumber = 0;
-  private transient LogHelper console;
+  protected transient LogHelper console;
   private long hashTableScale;
   private MapJoinMemoryExhaustionHandler memoryExhaustionHandler;
   
@@ -121,6 +118,7 @@ public class HashTableSinkOperator exten
     boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT);
     console = new LogHelper(LOG, isSilent);
     memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage());
+    EMPTY_ROW_CONTAINER.addRow(EMPTY_OBJECT_ARRAY);
 
     // for small tables only; so get the big table position first
     posBigTableAlias = conf.getPosBigTable();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Mon Jan 12 02:03:38 2015
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.exec;
 
 import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
@@ -59,6 +61,7 @@ import org.apache.hadoop.hive.ql.plan.Re
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.ScriptDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.UDTFDesc;
 import org.apache.hadoop.hive.ql.plan.UnionDesc;
@@ -103,6 +106,8 @@ public final class OperatorFactory {
         HashTableDummyOperator.class));
     opvec.add(new OpTuple<HashTableSinkDesc>(HashTableSinkDesc.class,
         HashTableSinkOperator.class));
+    opvec.add(new OpTuple<SparkHashTableSinkDesc>(SparkHashTableSinkDesc.class,
+        SparkHashTableSinkOperator.class));
     opvec.add(new OpTuple<DummyStoreDesc>(DummyStoreDesc.class,
         DummyStoreOperator.class));
     opvec.add(new OpTuple<DemuxDesc>(DemuxDesc.class,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Mon Jan 12 02:03:38 2015
@@ -53,6 +53,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.spark.SparkFiles;
 
 /**
  * ScriptOperator.
@@ -214,6 +215,7 @@ public class ScriptOperator extends Oper
       if (pathenv == null || pathSep == null || fileSep == null) {
         return null;
       }
+
       int val = -1;
       String classvalue = pathenv + pathSep;
 
@@ -332,6 +334,11 @@ public class ScriptOperator extends Oper
         if (!new File(prog).isAbsolute()) {
           PathFinder finder = new PathFinder("PATH");
           finder.prependPathComponent(currentDir.toString());
+
+          // In spark local mode, we need to search added files in root directory.
+          if (HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+            finder.prependPathComponent(SparkFiles.getRootDirectory());
+          }
           File f = finder.getAbsolutePath(prog);
           if (f != null) {
             cmdArgs[0] = f.getAbsolutePath();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Mon Jan 12 02:03:38 2015
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
 import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.Fu
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
 import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
@@ -103,6 +105,7 @@ public final class TaskFactory {
     taskvec.add(new TaskTuple<IndexMetadataChangeWork>(IndexMetadataChangeWork.class,
         IndexMetadataChangeTask.class));
     taskvec.add(new TaskTuple<TezWork>(TezWork.class, TezTask.class));
+    taskvec.add(new TaskTuple<SparkWork>(SparkWork.class, SparkTask.class));
 
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Jan 12 02:03:38 2015
@@ -112,6 +112,7 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
@@ -360,17 +361,21 @@ public final class Utilities {
     InputStream in = null;
     try {
       path = getPlanPath(conf, name);
+      LOG.info("PLAN PATH = " + path);
       assert path != null;
-      if (!gWorkMap.containsKey(path)) {
+      if (!gWorkMap.containsKey(path)
+        || HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
         Path localPath;
         if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) {
           localPath = new Path(name);
         } else if (ShimLoader.getHadoopShims().isLocalMode(conf)) {
           localPath = path;
         } else {
+          LOG.info("***************non-local mode***************");
           localPath = new Path(name);
         }
-
+        localPath = path;
+        LOG.info("local path = " + localPath);
         if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) {
           LOG.debug("Loading plan from string: "+path.toUri().getPath());
           String planString = conf.get(path.toUri().getPath());
@@ -382,7 +387,8 @@ public final class Utilities {
           in = new ByteArrayInputStream(planBytes);
           in = new InflaterInputStream(in);
         } else {
-          in = new FileInputStream(localPath.toUri().getPath());
+          LOG.info("Open file to read in plan: " + localPath);
+          in = localPath.getFileSystem(conf).open(localPath);
         }
 
         if(MAP_PLAN_NAME.equals(name)){
@@ -416,6 +422,7 @@ public final class Utilities {
       return gWork;
     } catch (FileNotFoundException fnf) {
       // happens. e.g.: no reduce work.
+      LOG.info("File not found: " + fnf.getMessage());
       LOG.info("No plan file found: "+path);
       return null;
     } catch (Exception e) {
@@ -967,6 +974,23 @@ public final class Utilities {
   }
 
   /**
+   * Clones using the powers of XML. Do not use unless necessary.
+   * @param plan The plan.
+   * @return The clone.
+   */
+  public static BaseWork cloneBaseWork(BaseWork plan) {
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+    Configuration conf = new HiveConf();
+    serializePlan(plan, baos, conf, true);
+    BaseWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+        plan.getClass(), conf, true);
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN);
+    return newPlan;
+  }
+
+  /**
    * Serialize the object. This helper function mainly makes sure that enums,
    * counters, etc are handled properly.
    */
@@ -2640,6 +2664,27 @@ public final class Utilities {
       }
     }
   }
+
+  public static List<SparkTask> getSparkTasks(List<Task<? extends Serializable>> tasks) {
+    List<SparkTask> sparkTasks = new ArrayList<SparkTask>();
+    if (tasks != null) {
+      getSparkTasks(tasks, sparkTasks);
+    }
+    return sparkTasks;
+  }
+
+  private static void getSparkTasks(List<Task<? extends Serializable>> tasks,
+    List<SparkTask> sparkTasks) {
+    for (Task<? extends Serializable> task : tasks) {
+      if (task instanceof SparkTask && !sparkTasks.contains(task)) {
+        sparkTasks.add((SparkTask) task);
+      }
+
+      if (task.getDependentTasks() != null) {
+        getSparkTasks(task.getDependentTasks(), sparkTasks);
+      }
+    }
+  }
 
   public static List<ExecDriver> getMRTasks(List<Task<? extends Serializable>> tasks) {
     List<ExecDriver> mrTasks = new ArrayList<ExecDriver>();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java Mon Jan 12 02:03:38 2015
@@ -63,7 +63,7 @@ public class ExecMapperContext {
 
   public ExecMapperContext(JobConf jc) {
     this.jc = jc;
-    ioCxt = IOContext.get(jc.get(Utilities.INPUT_NAME));
+    ioCxt = IOContext.get(jc);
   }
 
   public void clear() {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java Mon Jan 12 02:03:38 2015
@@ -115,7 +115,6 @@ public class MapJoinEagerRowContainer
 
   public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
   throws IOException, SerDeException {
-    clearRows();
     long numRows = in.readLong();
     for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
       container.readFields(in);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java Mon Jan 12 02:03:38 2015
@@ -19,15 +19,20 @@
 package org.apache.hadoop.hive.ql.exec.persistence;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Constructor;
 import java.util.ConcurrentModificationException;
 import java.util.Map;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.Writable;
 
 @SuppressWarnings("deprecation")
@@ -86,6 +91,74 @@ public class MapJoinTableContainerSerDe
       throw new HiveException("Error while trying to create table container", e);
     }
   }
+
+  /**
+   * Loads the table container from a folder. Only used on Spark path.
+   * @param fs FileSystem of the folder.
+   * @param folder The folder to load table container.
+   * @return Loaded table.
+   */
+  @SuppressWarnings("unchecked")
+  public MapJoinPersistableTableContainer load(
+      FileSystem fs, Path folder) throws HiveException {
+    try {
+      if (!fs.isDirectory(folder)) {
+        throw new HiveException("Error, not a directory: " + folder);
+      }
+      FileStatus[] fileStatuses = fs.listStatus(folder);
+      if (fileStatuses == null || fileStatuses.length == 0) {
+        return null;
+      }
+
+      SerDe keySerDe = keyContext.getSerDe();
+      SerDe valueSerDe = valueContext.getSerDe();
+      Writable keyContainer = keySerDe.getSerializedClass().newInstance();
+      Writable valueContainer = valueSerDe.getSerializedClass().newInstance();
+
+      MapJoinPersistableTableContainer tableContainer = null;
+
+      for (FileStatus fileStatus: fileStatuses) {
+        Path filePath = fileStatus.getPath();
+        if (ShimLoader.getHadoopShims().isDirectory(fileStatus)) {
+          throw new HiveException("Error, not a file: " + filePath);
+        }
+        InputStream is = null;
+        ObjectInputStream in = null;
+        try {
+          is = fs.open(filePath, 4096);
+          in = new ObjectInputStream(is);
+          String name = in.readUTF();
+          Map<String, String> metaData = (Map<String, String>) in.readObject();
+          if (tableContainer == null) {
+            tableContainer = create(name, metaData);
+          }
+          int numKeys = in.readInt();
+          for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
+            MapJoinKeyObject key = new MapJoinKeyObject();
+            key.read(keyContext, in, keyContainer);
+            if (tableContainer.get(key) == null) {
+              tableContainer.put(key, new MapJoinEagerRowContainer());
+            }
+            MapJoinEagerRowContainer values = (MapJoinEagerRowContainer) tableContainer.get(key);
+            values.read(valueContext, in, valueContainer);
+            tableContainer.put(key, values);
+          }
+        } finally {
+          if (in != null) {
+            in.close();
+          } else if (is != null) {
+            is.close();
+          }
+        }
+      }
+      return tableContainer;
+    } catch (IOException e) {
+      throw new HiveException("IO error while trying to create table container", e);
+    } catch (Exception e) {
+      throw new HiveException("Error while trying to create table container", e);
+    }
+  }
+
   public void persist(ObjectOutputStream out, MapJoinPersistableTableContainer tableContainer)
       throws HiveException {
     int numKeys = tableContainer.size();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Mon Jan 12 02:03:38 2015
@@ -162,7 +162,7 @@ public abstract class HiveContextAwareRe
   }
 
   public IOContext getIOContext() {
-    return IOContext.get(jobConf.get(Utilities.INPUT_NAME));
+    return IOContext.get(jobConf);
   }
 
   private void initIOContext(long startPos, boolean isBlockPointer,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Mon Jan 12 02:03:38 2015
@@ -21,8 +21,10 @@ package org.apache.hadoop.hive.ql.io;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 
 /**
  * IOContext basically contains the position information of the current
@@ -42,12 +44,21 @@ public class IOContext {
     protected synchronized IOContext initialValue() { return new IOContext(); }
  };
 
+  private static IOContext get() {
+    return IOContext.threadLocal.get();
+  }
+
   /**
    * Tez and MR use this map but are single threaded per JVM thus no synchronization is required.
    */
   private static final Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
 
-  public static IOContext get(String inputName) {
+
+  public static IOContext get(Configuration conf) {
+    if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+      return get();
+    }
+    String inputName = conf.get(Utilities.INPUT_NAME);
     if (!inputNameIOContextMap.containsKey(inputName)) {
       IOContext ioContext = new IOContext();
       inputNameIOContextMap.put(inputName, ioContext);
@@ -219,4 +230,5 @@ public class IOContext {
     this.comparison = null;
     this.genericUDFClassName = null;
   }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java Mon Jan 12 02:03:38 2015
@@ -65,6 +65,19 @@ public class PerfLogger {
   public static final String LOAD_HASHTABLE = "LoadHashtable";
   public static final String ORC_GET_SPLITS = "OrcGetSplits";
 
+  public static final String SPARK_SUBMIT_TO_RUNNING = "SparkSubmitToRunning";
+  public static final String SPARK_BUILD_PLAN = "SparkBuildPlan";
+  public static final String SPARK_BUILD_RDD_GRAPH = "SparkBuildRDDGraph";
+  public static final String SPARK_SUBMIT_JOB = "SparkSubmitJob";
+  public static final String SPARK_RUN_JOB = "SparkRunJob";
+  public static final String SPARK_CREATE_TRAN = "SparkCreateTran.";
+  public static final String SPARK_RUN_STAGE = "SparkRunStage.";
+  public static final String SPARK_INIT_OPERATORS = "SparkInitializeOperators";
+  public static final String SPARK_GENERATE_TASK_TREE = "SparkGenerateTaskTree";
+  public static final String SPARK_OPTIMIZE_OPERATOR_TREE = "SparkOptimizeOperatorTree";
+  public static final String SPARK_OPTIMIZE_TASK_TREE = "SparkOptimizeTaskTree";
+  public static final String SPARK_FLUSH_HASHTABLE = "SparkFlushHashTable.";
+
   protected static final ThreadLocal<PerfLogger> perfLogger = new ThreadLocal<PerfLogger>();
 
   protected final Map<String, Long> startTimes = new HashMap<String, Long>();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java Mon Jan 12 02:03:38 2015
@@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
@@ -204,6 +205,8 @@ abstract public class AbstractBucketJoin
     HashMap<String, Operator<? extends OperatorDesc>> topOps = pGraphContext.getTopOps();
     Map<TableScanOperator, Table> topToTable = pGraphContext.getTopToTable();
 
+    HashMap<String, String> aliasToNewAliasMap = new HashMap<String, String>();
+
     // (partition to bucket file names) and (partition to bucket number) for
     // the big table;
     LinkedHashMap<Partition, List<String>> bigTblPartsToBucketFileNames =
@@ -237,11 +240,14 @@ abstract public class AbstractBucketJoin
         for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpEntry : topOps.entrySet()) {
           if (topOpEntry.getValue() == tso) {
             String newAlias = topOpEntry.getKey();
-            joinAliases.set(index, newAlias);
-            if (baseBigAlias.equals(alias)) {
-              baseBigAlias = newAlias;
+            if (!newAlias.equals(alias)) {
+              joinAliases.set(index, newAlias);
+              if (baseBigAlias.equals(alias)) {
+                baseBigAlias = newAlias;
+              }
+              aliasToNewAliasMap.put(alias, newAlias);
+              alias = newAlias;
             }
-            alias = newAlias;
             break;
           }
         }
@@ -353,6 +359,9 @@ abstract public class AbstractBucketJoin
     context.setJoinAliases(joinAliases);
     context.setBaseBigAlias(baseBigAlias);
     context.setBigTablePartitioned(bigTablePartitioned);
+    if (!aliasToNewAliasMap.isEmpty()) {
+      context.setAliasToNewAliasMap(aliasToNewAliasMap);
+    }
 
     return true;
   }
@@ -433,6 +442,18 @@ abstract public class AbstractBucketJoin
       desc.setBigTablePartSpecToFileMapping(convert(bigTblPartsToBucketFileNames));
     }
 
+    Map<Integer, Set<String>> posToAliasMap = mapJoinOp.getPosToAliasMap();
+    Map<String, String> aliasToNewAliasMap = context.getAliasToNewAliasMap();
+    if (aliasToNewAliasMap != null && posToAliasMap != null) {
+      for (Map.Entry<String, String> entry: aliasToNewAliasMap.entrySet()) {
+        for (Set<String> aliases: posToAliasMap.values()) {
+          if (aliases.remove(entry.getKey())) {
+            aliases.add(entry.getValue());
+          }
+        }
+      }
+    }
+
     // successfully convert to bucket map join
     desc.setBucketMapJoin(true);
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Mon Jan 12 02:03:38 2015
@@ -521,7 +521,7 @@ abstract public class AbstractSMBJoinPro
     JoinOperator joinOp,
     SortBucketJoinProcCtx joinContext,
     ParseContext parseContext) throws SemanticException {
-    MapJoinOperator mapJoinOp = MapJoinProcessor.convertMapJoin(
+    MapJoinOperator mapJoinOp = new MapJoinProcessor().convertMapJoin(
       parseContext.getConf(),
       parseContext.getOpParseCtx(),
       joinOp,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketJoinProcCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketJoinProcCtx.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketJoinProcCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketJoinProcCtx.java Mon Jan 12 02:03:38 2015
@@ -40,6 +40,10 @@ public class BucketJoinProcCtx implement
   // The set of join operators which can be converted to a bucketed map join
   private Set<JoinOperator> convertedJoinOps = new HashSet<JoinOperator>();
 
+  // In checking if a mapjoin can be converted to bucket mapjoin,
+  // some join alias could be changed: alias -> newAlias
+  private transient Map<String, String> aliasToNewAliasMap;
+
   private Map<String, List<Integer>> tblAliasToNumberOfBucketsInEachPartition;
   private Map<String, List<List<String>>> tblAliasToBucketedFilePathsInEachPartition;
   private Map<Partition, List<String>> bigTblPartsToBucketFileNames;
@@ -130,4 +134,12 @@ public class BucketJoinProcCtx implement
   public void setBigTablePartitioned(boolean bigTablePartitioned) {
     this.bigTablePartitioned = bigTablePartitioned;
   }
+
+  public void setAliasToNewAliasMap(Map<String, String> aliasToNewAliasMap) {
+    this.aliasToNewAliasMap = aliasToNewAliasMap;
+  }
+
+  public Map<String, String> getAliasToNewAliasMap() {
+    return aliasToNewAliasMap;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java?rev=1651024&r1=1651023&r2=1651024&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java Mon Jan 12 02:03:38 2015
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.optimizer;
 
+import java.util.List;
+import java.util.Map;
 import java.util.Stack;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -26,7 +28,9 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 
 public class BucketMapjoinProc extends AbstractBucketJoinProc implements NodeProcessor {
   public BucketMapjoinProc(ParseContext pGraphContext) {
@@ -34,7 +38,6 @@ public class BucketMapjoinProc extends A
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
       Object... nodeOutputs) throws SemanticException {
     BucketJoinProcCtx context = (BucketJoinProcCtx) procCtx;
@@ -58,4 +61,20 @@ public class BucketMapjoinProc extends A
 
     return null;
   }
+
+  /**
+   * Check if a mapjoin can be converted to a bucket mapjoin,
+   * and do the version if possible.
+   */
+  public static void checkAndConvertBucketMapJoin(ParseContext pGraphContext,
+      MapJoinOperator mapJoinOp, QBJoinTree joinCtx, String baseBigAlias,
+      List<String> joinAliases) throws SemanticException {
+    BucketJoinProcCtx ctx = new BucketJoinProcCtx(pGraphContext.getConf());
+    BucketMapjoinProc proc = new BucketMapjoinProc(pGraphContext);
+    Map<Byte, List<ExprNodeDesc>> keysMap = mapJoinOp.getConf().getKeys();
+    if (proc.checkConvertBucketMapJoin(pGraphContext, ctx,
+        joinCtx, keysMap, baseBigAlias, joinAliases)) {
+      proc.convertMapJoinToBucketMapJoin(mapJoinOp, ctx);
+    }
+  }
 }