You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/24 09:03:38 UTC

svn commit: r1627235 [1/9] - in /hive/trunk: itests/src/test/resources/ itests/util/src/main/java/org/apache/hadoop/hive/ql/ metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ ql/if/ ql/src/gen/thrift/gen-cpp/ ql/src/gen/thrift...

Author: gunther
Date: Wed Sep 24 07:03:35 2014
New Revision: 1627235

URL: http://svn.apache.org/r1627235
Log:
HIVE-7482: The execution side changes for SMB join in hive-tez (Vikram Dixit K via Gunther Hagleitner)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordSource.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java
    hive/trunk/ql/src/test/queries/clientpositive/tez_smb_1.q
    hive/trunk/ql/src/test/queries/clientpositive/tez_smb_main.q
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_10.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_11.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_12.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_13.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_14.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_15.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_16.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_3.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_4.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_5.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_6.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_7.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_8.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_9.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/tez_smb_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/tez_smb_main.q.out
Removed:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java
Modified:
    hive/trunk/itests/src/test/resources/testconfiguration.properties
    hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
    hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FieldSchema.java
    hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Partition.java
    hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/SerDeInfo.java
    hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java
    hive/trunk/ql/if/queryplan.thrift
    hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
    hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h
    hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
    hive/trunk/ql/src/gen/thrift/gen-php/Types.php
    hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
    hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
    hive/trunk/ql/src/test/results/clientpositive/tez/correlationoptimizer1.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/cross_join.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/cross_product_check_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/filter_join_breaktask.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/join0.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/join1.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/limit_pushdown.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/metadataonly1.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/mrr.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/optimize_nullscan.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/subquery_exists.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/subquery_in.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/tez_join_hash.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/tez_join_tests.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/tez_joins_explain.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/tez_union.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/vectorized_dynamic_partition_pruning.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/vectorized_shufflejoin.q.out

Modified: hive/trunk/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/trunk/itests/src/test/resources/testconfiguration.properties?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/trunk/itests/src/test/resources/testconfiguration.properties Wed Sep 24 07:03:35 2014
@@ -171,7 +171,22 @@ minitez.query.files.shared=alter_merge_2
   vectorized_nested_mapjoin.q,\
   vectorized_ptf.q,\
   vectorized_shufflejoin.q,\
-  vectorized_timestamp_funcs.q
+  vectorized_timestamp_funcs.q,\
+  auto_sortmerge_join_1.q,\
+  auto_sortmerge_join_10.q,\
+  auto_sortmerge_join_11.q,\
+  auto_sortmerge_join_12.q,\
+  auto_sortmerge_join_13.q,\
+  auto_sortmerge_join_14.q,\
+  auto_sortmerge_join_15.q,\
+  auto_sortmerge_join_16.q,\
+  auto_sortmerge_join_2.q,\
+  auto_sortmerge_join_3.q,\
+  auto_sortmerge_join_4.q,\
+  auto_sortmerge_join_5.q,\
+  auto_sortmerge_join_7.q,\
+  auto_sortmerge_join_8.q,\
+  auto_sortmerge_join_9.q
 
 minitez.query.files=bucket_map_join_tez1.q,\
   bucket_map_join_tez2.q,\
@@ -189,6 +204,8 @@ minitez.query.files=bucket_map_join_tez1
   tez_schema_evolution.q,\
   tez_union.q,\
   tez_union_decimal.q,\
+  tez_smb_main.q,\
+  tez_smb_1.q,\
   vectorized_dynamic_partition_pruning.q
 
 beeline.positive.exclude=add_part_exist.q,\

Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java Wed Sep 24 07:03:35 2014
@@ -690,7 +690,10 @@ public class QTestUtil {
     // conf.logVars(System.out);
     // System.out.flush();
 
+    String execEngine = conf.get("hive.execution.engine");
+    conf.set("hive.execution.engine", "mr");
     SessionState.start(conf);
+    conf.set("hive.execution.engine", execEngine);
     db = Hive.get(conf);
     fs = FileSystem.get(conf);
     drv = new Driver(conf);
@@ -771,6 +774,8 @@ public class QTestUtil {
     HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER,
         "org.apache.hadoop.hive.ql.security.DummyAuthenticator");
 
+    String execEngine = conf.get("hive.execution.engine");
+    conf.set("hive.execution.engine", "mr");
     CliSessionState ss = new CliSessionState(conf);
     assert ss != null;
     ss.in = System.in;
@@ -788,6 +793,7 @@ public class QTestUtil {
 
     isSessionStateStarted = true;
 
+    conf.set("hive.execution.engine", execEngine);
     return ss;
   }
 

Modified: hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FieldSchema.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FieldSchema.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FieldSchema.java (original)
+++ hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FieldSchema.java Wed Sep 24 07:03:35 2014
@@ -135,9 +135,9 @@ public class FieldSchema implements org.
     String comment)
   {
     this();
-    this.name = org.apache.hive.common.util.HiveStringUtils.intern(name);
-    this.type = org.apache.hive.common.util.HiveStringUtils.intern(type);
-    this.comment = org.apache.hive.common.util.HiveStringUtils.intern(comment);
+    this.name = name;
+    this.type = type;
+    this.comment = comment;
   }
 
   /**
@@ -145,13 +145,13 @@ public class FieldSchema implements org.
    */
   public FieldSchema(FieldSchema other) {
     if (other.isSetName()) {
-      this.name = org.apache.hive.common.util.HiveStringUtils.intern(other.name);
+      this.name = other.name;
     }
     if (other.isSetType()) {
-      this.type = org.apache.hive.common.util.HiveStringUtils.intern(other.type);
+      this.type = other.type;
     }
     if (other.isSetComment()) {
-      this.comment = org.apache.hive.common.util.HiveStringUtils.intern(other.comment);
+      this.comment = other.comment;
     }
   }
 
@@ -171,7 +171,7 @@ public class FieldSchema implements org.
   }
 
   public void setName(String name) {
-    this.name = org.apache.hive.common.util.HiveStringUtils.intern(name);
+    this.name = name;
   }
 
   public void unsetName() {
@@ -194,7 +194,7 @@ public class FieldSchema implements org.
   }
 
   public void setType(String type) {
-    this.type = org.apache.hive.common.util.HiveStringUtils.intern(type);
+    this.type = type;
   }
 
   public void unsetType() {
@@ -217,7 +217,7 @@ public class FieldSchema implements org.
   }
 
   public void setComment(String comment) {
-    this.comment = org.apache.hive.common.util.HiveStringUtils.intern(comment);
+    this.comment = comment;
   }
 
   public void unsetComment() {

Modified: hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Partition.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Partition.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Partition.java (original)
+++ hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Partition.java Wed Sep 24 07:03:35 2014
@@ -182,14 +182,14 @@ public class Partition implements org.ap
   {
     this();
     this.values = values;
-    this.dbName = org.apache.hive.common.util.HiveStringUtils.intern(dbName);
-    this.tableName = org.apache.hive.common.util.HiveStringUtils.intern(tableName);
+    this.dbName = dbName;
+    this.tableName = tableName;
     this.createTime = createTime;
     setCreateTimeIsSet(true);
     this.lastAccessTime = lastAccessTime;
     setLastAccessTimeIsSet(true);
     this.sd = sd;
-    this.parameters = org.apache.hive.common.util.HiveStringUtils.intern(parameters);
+    this.parameters = parameters;
   }
 
   /**
@@ -205,10 +205,10 @@ public class Partition implements org.ap
       this.values = __this__values;
     }
     if (other.isSetDbName()) {
-      this.dbName = org.apache.hive.common.util.HiveStringUtils.intern(other.dbName);
+      this.dbName = other.dbName;
     }
     if (other.isSetTableName()) {
-      this.tableName = org.apache.hive.common.util.HiveStringUtils.intern(other.tableName);
+      this.tableName = other.tableName;
     }
     this.createTime = other.createTime;
     this.lastAccessTime = other.lastAccessTime;
@@ -222,9 +222,9 @@ public class Partition implements org.ap
         String other_element_key = other_element.getKey();
         String other_element_value = other_element.getValue();
 
-        String __this__parameters_copy_key = org.apache.hive.common.util.HiveStringUtils.intern(other_element_key);
+        String __this__parameters_copy_key = other_element_key;
 
-        String __this__parameters_copy_value = org.apache.hive.common.util.HiveStringUtils.intern(other_element_value);
+        String __this__parameters_copy_value = other_element_value;
 
         __this__parameters.put(__this__parameters_copy_key, __this__parameters_copy_value);
       }
@@ -296,7 +296,7 @@ public class Partition implements org.ap
   }
 
   public void setDbName(String dbName) {
-    this.dbName = org.apache.hive.common.util.HiveStringUtils.intern(dbName);
+    this.dbName = dbName;
   }
 
   public void unsetDbName() {
@@ -319,7 +319,7 @@ public class Partition implements org.ap
   }
 
   public void setTableName(String tableName) {
-    this.tableName = org.apache.hive.common.util.HiveStringUtils.intern(tableName);
+    this.tableName = tableName;
   }
 
   public void unsetTableName() {
@@ -420,7 +420,7 @@ public class Partition implements org.ap
   }
 
   public void setParameters(Map<String,String> parameters) {
-    this.parameters = org.apache.hive.common.util.HiveStringUtils.intern(parameters);
+    this.parameters = parameters;
   }
 
   public void unsetParameters() {

Modified: hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/SerDeInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/SerDeInfo.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/SerDeInfo.java (original)
+++ hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/SerDeInfo.java Wed Sep 24 07:03:35 2014
@@ -137,9 +137,9 @@ public class SerDeInfo implements org.ap
     Map<String,String> parameters)
   {
     this();
-    this.name = org.apache.hive.common.util.HiveStringUtils.intern(name);
-    this.serializationLib = org.apache.hive.common.util.HiveStringUtils.intern(serializationLib);
-    this.parameters = org.apache.hive.common.util.HiveStringUtils.intern(parameters);
+    this.name = name;
+    this.serializationLib = serializationLib;
+    this.parameters = parameters;
   }
 
   /**
@@ -147,10 +147,10 @@ public class SerDeInfo implements org.ap
    */
   public SerDeInfo(SerDeInfo other) {
     if (other.isSetName()) {
-      this.name = org.apache.hive.common.util.HiveStringUtils.intern(other.name);
+      this.name = other.name;
     }
     if (other.isSetSerializationLib()) {
-      this.serializationLib = org.apache.hive.common.util.HiveStringUtils.intern(other.serializationLib);
+      this.serializationLib = other.serializationLib;
     }
     if (other.isSetParameters()) {
       Map<String,String> __this__parameters = new HashMap<String,String>();
@@ -159,9 +159,9 @@ public class SerDeInfo implements org.ap
         String other_element_key = other_element.getKey();
         String other_element_value = other_element.getValue();
 
-        String __this__parameters_copy_key = org.apache.hive.common.util.HiveStringUtils.intern(other_element_key);
+        String __this__parameters_copy_key = other_element_key;
 
-        String __this__parameters_copy_value = org.apache.hive.common.util.HiveStringUtils.intern(other_element_value);
+        String __this__parameters_copy_value = other_element_value;
 
         __this__parameters.put(__this__parameters_copy_key, __this__parameters_copy_value);
       }
@@ -185,7 +185,7 @@ public class SerDeInfo implements org.ap
   }
 
   public void setName(String name) {
-    this.name = org.apache.hive.common.util.HiveStringUtils.intern(name);
+    this.name = name;
   }
 
   public void unsetName() {
@@ -208,7 +208,7 @@ public class SerDeInfo implements org.ap
   }
 
   public void setSerializationLib(String serializationLib) {
-    this.serializationLib = org.apache.hive.common.util.HiveStringUtils.intern(serializationLib);
+    this.serializationLib = serializationLib;
   }
 
   public void unsetSerializationLib() {
@@ -242,7 +242,7 @@ public class SerDeInfo implements org.ap
   }
 
   public void setParameters(Map<String,String> parameters) {
-    this.parameters = org.apache.hive.common.util.HiveStringUtils.intern(parameters);
+    this.parameters = parameters;
   }
 
   public void unsetParameters() {

Modified: hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java (original)
+++ hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java Wed Sep 24 07:03:35 2014
@@ -216,17 +216,17 @@ public class StorageDescriptor implement
   {
     this();
     this.cols = cols;
-    this.location = org.apache.hive.common.util.HiveStringUtils.intern(location);
-    this.inputFormat = org.apache.hive.common.util.HiveStringUtils.intern(inputFormat);
-    this.outputFormat = org.apache.hive.common.util.HiveStringUtils.intern(outputFormat);
+    this.location = location;
+    this.inputFormat = inputFormat;
+    this.outputFormat = outputFormat;
     this.compressed = compressed;
     setCompressedIsSet(true);
     this.numBuckets = numBuckets;
     setNumBucketsIsSet(true);
     this.serdeInfo = serdeInfo;
-    this.bucketCols = org.apache.hive.common.util.HiveStringUtils.intern(bucketCols);
+    this.bucketCols = bucketCols;
     this.sortCols = sortCols;
-    this.parameters = org.apache.hive.common.util.HiveStringUtils.intern(parameters);
+    this.parameters = parameters;
   }
 
   /**
@@ -242,13 +242,13 @@ public class StorageDescriptor implement
       this.cols = __this__cols;
     }
     if (other.isSetLocation()) {
-      this.location = org.apache.hive.common.util.HiveStringUtils.intern(other.location);
+      this.location = other.location;
     }
     if (other.isSetInputFormat()) {
-      this.inputFormat = org.apache.hive.common.util.HiveStringUtils.intern(other.inputFormat);
+      this.inputFormat = other.inputFormat;
     }
     if (other.isSetOutputFormat()) {
-      this.outputFormat = org.apache.hive.common.util.HiveStringUtils.intern(other.outputFormat);
+      this.outputFormat = other.outputFormat;
     }
     this.compressed = other.compressed;
     this.numBuckets = other.numBuckets;
@@ -276,9 +276,9 @@ public class StorageDescriptor implement
         String other_element_key = other_element.getKey();
         String other_element_value = other_element.getValue();
 
-        String __this__parameters_copy_key = org.apache.hive.common.util.HiveStringUtils.intern(other_element_key);
+        String __this__parameters_copy_key = other_element_key;
 
-        String __this__parameters_copy_value = org.apache.hive.common.util.HiveStringUtils.intern(other_element_value);
+        String __this__parameters_copy_value = other_element_value;
 
         __this__parameters.put(__this__parameters_copy_key, __this__parameters_copy_value);
       }
@@ -356,7 +356,7 @@ public class StorageDescriptor implement
   }
 
   public void setLocation(String location) {
-    this.location = org.apache.hive.common.util.HiveStringUtils.intern(location);
+    this.location = location;
   }
 
   public void unsetLocation() {
@@ -379,7 +379,7 @@ public class StorageDescriptor implement
   }
 
   public void setInputFormat(String inputFormat) {
-    this.inputFormat = org.apache.hive.common.util.HiveStringUtils.intern(inputFormat);
+    this.inputFormat = inputFormat;
   }
 
   public void unsetInputFormat() {
@@ -402,7 +402,7 @@ public class StorageDescriptor implement
   }
 
   public void setOutputFormat(String outputFormat) {
-    this.outputFormat = org.apache.hive.common.util.HiveStringUtils.intern(outputFormat);
+    this.outputFormat = outputFormat;
   }
 
   public void unsetOutputFormat() {
@@ -507,7 +507,7 @@ public class StorageDescriptor implement
   }
 
   public void setBucketCols(List<String> bucketCols) {
-    this.bucketCols = org.apache.hive.common.util.HiveStringUtils.intern(bucketCols);
+    this.bucketCols = bucketCols;
   }
 
   public void unsetBucketCols() {
@@ -579,7 +579,7 @@ public class StorageDescriptor implement
   }
 
   public void setParameters(Map<String,String> parameters) {
-    this.parameters = org.apache.hive.common.util.HiveStringUtils.intern(parameters);
+    this.parameters = parameters;
   }
 
   public void unsetParameters() {

Modified: hive/trunk/ql/if/queryplan.thrift
URL: http://svn.apache.org/viewvc/hive/trunk/ql/if/queryplan.thrift?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/if/queryplan.thrift (original)
+++ hive/trunk/ql/if/queryplan.thrift Wed Sep 24 07:03:35 2014
@@ -59,6 +59,7 @@ enum OperatorType {
   EVENT,
   ORCFILEMERGE,
   RCFILEMERGE,
+  MERGEJOIN,
 }
 
 struct Operator {

Modified: hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp (original)
+++ hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp Wed Sep 24 07:03:35 2014
@@ -54,7 +54,8 @@ int _kOperatorTypeValues[] = {
   OperatorType::DEMUX,
   OperatorType::EVENT,
   OperatorType::ORCFILEMERGE,
-  OperatorType::RCFILEMERGE
+  OperatorType::RCFILEMERGE,
+  OperatorType::MERGEJOIN
 };
 const char* _kOperatorTypeNames[] = {
   "JOIN",
@@ -80,9 +81,10 @@ const char* _kOperatorTypeNames[] = {
   "DEMUX",
   "EVENT",
   "ORCFILEMERGE",
-  "RCFILEMERGE"
+  "RCFILEMERGE",
+  "MERGEJOIN"
 };
-const std::map<int, const char*> _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(24, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(25, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
 
 int _kTaskTypeValues[] = {
   TaskType::MAP,

Modified: hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h (original)
+++ hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h Wed Sep 24 07:03:35 2014
@@ -59,7 +59,8 @@ struct OperatorType {
     DEMUX = 20,
     EVENT = 21,
     ORCFILEMERGE = 22,
-    RCFILEMERGE = 23
+    RCFILEMERGE = 23,
+    MERGEJOIN = 24
   };
 };
 

Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java Wed Sep 24 07:03:35 2014
@@ -35,7 +35,8 @@ public enum OperatorType implements org.
   DEMUX(20),
   EVENT(21),
   ORCFILEMERGE(22),
-  RCFILEMERGE(23);
+  RCFILEMERGE(23),
+  MERGEJOIN(24);
 
   private final int value;
 
@@ -104,6 +105,8 @@ public enum OperatorType implements org.
         return ORCFILEMERGE;
       case 23:
         return RCFILEMERGE;
+      case 24:
+        return MERGEJOIN;
       default:
         return null;
     }

Modified: hive/trunk/ql/src/gen/thrift/gen-php/Types.php
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-php/Types.php?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-php/Types.php (original)
+++ hive/trunk/ql/src/gen/thrift/gen-php/Types.php Wed Sep 24 07:03:35 2014
@@ -59,6 +59,7 @@ final class OperatorType {
   const EVENT = 21;
   const ORCFILEMERGE = 22;
   const RCFILEMERGE = 23;
+  const MERGEJOIN = 24;
   static public $__names = array(
     0 => 'JOIN',
     1 => 'MAPJOIN',
@@ -84,6 +85,7 @@ final class OperatorType {
     21 => 'EVENT',
     22 => 'ORCFILEMERGE',
     23 => 'RCFILEMERGE',
+    24 => 'MERGEJOIN',
   );
 }
 

Modified: hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py (original)
+++ hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py Wed Sep 24 07:03:35 2014
@@ -69,6 +69,7 @@ class OperatorType:
   EVENT = 21
   ORCFILEMERGE = 22
   RCFILEMERGE = 23
+  MERGEJOIN = 24
 
   _VALUES_TO_NAMES = {
     0: "JOIN",
@@ -95,6 +96,7 @@ class OperatorType:
     21: "EVENT",
     22: "ORCFILEMERGE",
     23: "RCFILEMERGE",
+    24: "MERGEJOIN",
   }
 
   _NAMES_TO_VALUES = {
@@ -122,6 +124,7 @@ class OperatorType:
     "EVENT": 21,
     "ORCFILEMERGE": 22,
     "RCFILEMERGE": 23,
+    "MERGEJOIN": 24,
   }
 
 class TaskType:

Modified: hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb (original)
+++ hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb Wed Sep 24 07:03:35 2014
@@ -45,8 +45,9 @@ module OperatorType
   EVENT = 21
   ORCFILEMERGE = 22
   RCFILEMERGE = 23
-  VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "EVENT", 22 => "ORCFILEMERGE", 23 => "RCFILEMERGE"}
-  VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX, EVENT, ORCFILEMERGE, RCFILEMERGE]).freeze
+  MERGEJOIN = 24
+  VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "EVENT", 22 => "ORCFILEMERGE", 23 => "RCFILEMERGE", 24 => "MERGEJOIN"}
+  VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX, EVENT, ORCFILEMERGE, RCFILEMERGE, MERGEJOIN]).freeze
 end
 
 module TaskType

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Wed Sep 24 07:03:35 2014
@@ -61,13 +61,13 @@ public abstract class AbstractMapJoinOpe
   @Override
   @SuppressWarnings("unchecked")
   protected void initializeOp(Configuration hconf) throws HiveException {
-    int tagLen = conf.getTagLength();
-
-    joinKeys = new List[tagLen];
-
-    JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), NOTSKIPBIGTABLE);
-    joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys,
-        inputObjInspectors,NOTSKIPBIGTABLE, tagLen);
+    if (conf.getGenJoinKeys()) {
+      int tagLen = conf.getTagLength();
+      joinKeys = new List[tagLen];
+      JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), NOTSKIPBIGTABLE);
+      joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys,
+          inputObjInspectors,NOTSKIPBIGTABLE, tagLen);
+    }
 
     super.initializeOp(hconf);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Wed Sep 24 07:03:35 2014
@@ -323,7 +323,6 @@ public abstract class CommonJoinOperator
 
   @Override
   public void startGroup() throws HiveException {
-    LOG.trace("Join: Starting new group");
     newGroupStarted = true;
     for (AbstractRowContainer<List<Object>> alw : storage) {
       alw.clearRows();
@@ -632,8 +631,6 @@ public abstract class CommonJoinOperator
    */
   @Override
   public void endGroup() throws HiveException {
-    LOG.trace("Join Op: endGroup called: numValues=" + numAliases);
-
     checkAndGenObject();
   }
 
@@ -719,7 +716,6 @@ public abstract class CommonJoinOperator
 
         if (noOuterJoin) {
           if (alw.rowCount() == 0) {
-            LOG.trace("No data for alias=" + i);
             return;
           } else if (alw.rowCount() > 1) {
             mayHasMoreThanOne = true;
@@ -776,7 +772,6 @@ public abstract class CommonJoinOperator
    */
   @Override
   public void closeOp(boolean abort) throws HiveException {
-    LOG.trace("Join Op close");
     for (AbstractRowContainer<List<Object>> alw : storage) {
       if (alw != null) {
         alw.clearRows(); // clean up the temp files

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,507 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
+import org.apache.hadoop.hive.ql.exec.tez.RecordSource;
+import org.apache.hadoop.hive.ql.exec.tez.TezContext;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+/*
+ * With an aim to consolidate the join algorithms to either hash based joins (MapJoinOperator) or
+ * sort-merge based joins, this operator is being introduced. This operator executes a sort-merge
+ * based algorithm. It replaces both the JoinOperator and the SMBMapJoinOperator for the tez side of
+ * things. It works in either the map phase or reduce phase.
+ *
+ * The basic algorithm is as follows:
+ *
+ * 1. The processOp receives a row from a "big" table.
+ * 2. In order to process it, the operator does a fetch for rows from the other tables.
+ * 3. Once we have a set of rows from the other tables (till we hit a new key), more rows are
+ *    brought in from the big table and a join is performed.
+ */
+
+public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMergeJoinDesc> implements
+    Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private boolean isBigTableWork;
+  private static final Log LOG = LogFactory.getLog(CommonMergeJoinOperator.class.getName());
+  private Map<Integer, String> aliasToInputNameMap;
+  transient List<Object>[] keyWritables;
+  transient List<Object>[] nextKeyWritables;
+  transient RowContainer<List<Object>>[] nextGroupStorage;
+  transient RowContainer<List<Object>>[] candidateStorage;
+
+  transient String[] tagToAlias;
+  private transient boolean[] fetchDone;
+  private transient boolean[] foundNextKeyGroup;
+  transient boolean firstFetchHappened = false;
+  transient boolean localWorkInited = false;
+  transient boolean initDone = false;
+  transient List<Object> otherKey = null;
+  transient List<Object> values = null;
+  transient RecordSource[] sources;
+  transient List<Operator<? extends OperatorDesc>> originalParents =
+      new ArrayList<Operator<? extends OperatorDesc>>();
+
+  public CommonMergeJoinOperator() {
+    super();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+    firstFetchHappened = false;
+    initializeChildren(hconf);
+    int maxAlias = 0;
+    for (byte pos = 0; pos < order.length; pos++) {
+      if (pos > maxAlias) {
+        maxAlias = pos;
+      }
+    }
+    maxAlias += 1;
+
+    nextGroupStorage = new RowContainer[maxAlias];
+    candidateStorage = new RowContainer[maxAlias];
+    keyWritables = new ArrayList[maxAlias];
+    nextKeyWritables = new ArrayList[maxAlias];
+    fetchDone = new boolean[maxAlias];
+    foundNextKeyGroup = new boolean[maxAlias];
+
+    int bucketSize;
+
+    int oldVar = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINBUCKETCACHESIZE);
+    if (oldVar != 100) {
+      bucketSize = oldVar;
+    } else {
+      bucketSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESMBJOINCACHEROWS);
+    }
+
+    for (byte pos = 0; pos < order.length; pos++) {
+      RowContainer<List<Object>> rc =
+          JoinUtil.getRowContainer(hconf, rowContainerStandardObjectInspectors[pos], pos,
+              bucketSize, spillTableDesc, conf, !hasFilter(pos), reporter);
+      nextGroupStorage[pos] = rc;
+      RowContainer<List<Object>> candidateRC =
+          JoinUtil.getRowContainer(hconf, rowContainerStandardObjectInspectors[pos], pos,
+              bucketSize, spillTableDesc, conf, !hasFilter(pos), reporter);
+      candidateStorage[pos] = candidateRC;
+    }
+
+    for (byte pos = 0; pos < order.length; pos++) {
+      if (pos != posBigTable) {
+        fetchDone[pos] = false;
+      }
+      foundNextKeyGroup[pos] = false;
+    }
+
+    sources = ((TezContext) MapredContext.get()).getRecordSources();
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hive.ql.exec.Operator#processOp(java.lang.Object,
+   * int) this processor has a push-pull model. First call to this method is a
+   * push but the rest is pulled until we run out of records.
+   */
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
+    posBigTable = (byte) conf.getBigTablePosition();
+
+    byte alias = (byte) tag;
+    List<Object> value = getFilteredValue(alias, row);
+    // compute keys and values as StandardObjects
+    List<Object> key = mergeJoinComputeKeys(row, alias);
+
+    if (!firstFetchHappened) {
+      firstFetchHappened = true;
+      // fetch the first group for all small table aliases
+      for (byte pos = 0; pos < order.length; pos++) {
+        if (pos != posBigTable) {
+          fetchNextGroup(pos);
+        }
+      }
+    }
+
+    //have we reached a new key group?
+    boolean nextKeyGroup = processKey(alias, key);
+    if (nextKeyGroup) {
+      //assert this.nextGroupStorage[alias].size() == 0;
+      this.nextGroupStorage[alias].addRow(value);
+      foundNextKeyGroup[tag] = true;
+      if (tag != posBigTable) {
+        return;
+      }
+    }
+
+    reportProgress();
+    numMapRowsRead++;
+
+    // the big table has reached a new key group. try to let the small tables
+    // catch up with the big table.
+    if (nextKeyGroup) {
+      assert tag == posBigTable;
+      List<Byte> smallestPos = null;
+      do {
+        smallestPos = joinOneGroup();
+        //jump out the loop if we need input from the big table
+      } while (smallestPos != null && smallestPos.size() > 0
+          && !smallestPos.contains(this.posBigTable));
+
+      return;
+    }
+
+    assert !nextKeyGroup;
+    candidateStorage[tag].addRow(value);
+
+  }
+
+  private List<Byte> joinOneGroup() throws HiveException {
+    int[] smallestPos = findSmallestKey();
+    List<Byte> listOfNeedFetchNext = null;
+    if (smallestPos != null) {
+      listOfNeedFetchNext = joinObject(smallestPos);
+      if (listOfNeedFetchNext.size() > 0) {
+        // listOfNeedFetchNext contains all tables that we have joined data in their
+        // candidateStorage, and we need to clear candidate storage and promote their
+        // nextGroupStorage to candidateStorage and fetch data until we reach a
+        // new group.
+        for (Byte b : listOfNeedFetchNext) {
+          try {
+            fetchNextGroup(b);
+          } catch (Exception e) {
+            throw new HiveException(e);
+          }
+        }
+      }
+    }
+    return listOfNeedFetchNext;
+  }
+
+  private List<Byte> joinObject(int[] smallestPos) throws HiveException {
+    List<Byte> needFetchList = new ArrayList<Byte>();
+    byte index = (byte) (smallestPos.length - 1);
+    for (; index >= 0; index--) {
+      if (smallestPos[index] > 0 || keyWritables[index] == null) {
+        putDummyOrEmpty(index);
+        continue;
+      }
+      storage[index] = candidateStorage[index];
+      needFetchList.add(index);
+      if (smallestPos[index] < 0) {
+        break;
+      }
+    }
+    for (index--; index >= 0; index--) {
+      putDummyOrEmpty(index);
+    }
+    checkAndGenObject();
+    for (Byte pos : needFetchList) {
+      this.candidateStorage[pos].clearRows();
+      this.keyWritables[pos] = null;
+    }
+    return needFetchList;
+  }
+
+  private void putDummyOrEmpty(Byte i) {
+    // put a empty list or null
+    if (noOuterJoin) {
+      storage[i] = emptyList;
+    } else {
+      storage[i] = dummyObjVectors[i];
+    }
+  }
+
+  private int[] findSmallestKey() {
+    int[] result = new int[order.length];
+    List<Object> smallestOne = null;
+
+    for (byte pos = 0; pos < order.length; pos++) {
+      List<Object> key = keyWritables[pos];
+      if (key == null) {
+        continue;
+      }
+      if (smallestOne == null) {
+        smallestOne = key;
+        result[pos] = -1;
+        continue;
+      }
+      result[pos] = compareKeys(key, smallestOne);
+      if (result[pos] < 0) {
+        smallestOne = key;
+      }
+    }
+    return smallestOne == null ? null : result;
+  }
+
+  private void fetchNextGroup(Byte t) throws HiveException {
+    if (foundNextKeyGroup[t]) {
+      // first promote the next group to be the current group if we reached a
+      // new group in the previous fetch
+      if ((this.nextKeyWritables[t] != null) || (this.fetchDone[t] == false)) {
+        promoteNextGroupToCandidate(t);
+      } else {
+        this.keyWritables[t] = null;
+        this.candidateStorage[t] = null;
+        this.nextGroupStorage[t] = null;
+      }
+      foundNextKeyGroup[t] = false;
+    }
+    // for the big table, we only need to promote the next group to the current group.
+    if (t == posBigTable) {
+      return;
+    }
+
+    // for tables other than the big table, we need to fetch more data until reach a new group or
+    // done.
+    while (!foundNextKeyGroup[t]) {
+      if (fetchDone[t]) {
+        break;
+      }
+      fetchOneRow(t);
+    }
+    if (!foundNextKeyGroup[t] && fetchDone[t]) {
+      this.nextKeyWritables[t] = null;
+    }
+  }
+
+  @Override
+  public void closeOp(boolean abort) throws HiveException {
+    joinFinalLeftData();
+
+    // clean up
+    for (int pos = 0; pos < order.length; pos++) {
+      if (pos != posBigTable) {
+        fetchDone[pos] = false;
+      }
+      foundNextKeyGroup[pos] = false;
+    }
+  }
+
+  private void fetchOneRow(byte tag) throws HiveException {
+    try {
+      fetchDone[tag] = !sources[tag].pushRecord();
+      if (sources[tag].isGrouped()) {
+        // instead of maintaining complex state for the fetch of the next group,
+        // we know for sure that at the end of all the values for a given key,
+        // we will definitely reach the next key group.
+        foundNextKeyGroup[tag] = true;
+      }
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  private void joinFinalLeftData() throws HiveException {
+    @SuppressWarnings("rawtypes")
+    RowContainer bigTblRowContainer = this.candidateStorage[this.posBigTable];
+
+    boolean allFetchDone = allFetchDone();
+    // if all left data in small tables are less than and equal to the left data
+    // in big table, let's them catch up
+    while (bigTblRowContainer != null && bigTblRowContainer.rowCount() > 0 && !allFetchDone) {
+      joinOneGroup();
+      bigTblRowContainer = this.candidateStorage[this.posBigTable];
+      allFetchDone = allFetchDone();
+    }
+
+    while (!allFetchDone) {
+      List<Byte> ret = joinOneGroup();
+      if (ret == null || ret.size() == 0) {
+        break;
+      }
+      reportProgress();
+      numMapRowsRead++;
+      allFetchDone = allFetchDone();
+    }
+
+    boolean dataInCache = true;
+    while (dataInCache) {
+      for (byte pos = 0; pos < order.length; pos++) {
+        if (this.foundNextKeyGroup[pos] && this.nextKeyWritables[pos] != null) {
+          promoteNextGroupToCandidate(pos);
+        }
+      }
+      joinOneGroup();
+      dataInCache = false;
+      for (byte pos = 0; pos < order.length; pos++) {
+        if (this.candidateStorage[pos].rowCount() > 0) {
+          dataInCache = true;
+          break;
+        }
+      }
+    }
+  }
+
+  private boolean allFetchDone() {
+    boolean allFetchDone = true;
+    for (byte pos = 0; pos < order.length; pos++) {
+      if (pos == posBigTable) {
+        continue;
+      }
+      allFetchDone = allFetchDone && fetchDone[pos];
+    }
+    return allFetchDone;
+  }
+
+  private void promoteNextGroupToCandidate(Byte t) throws HiveException {
+    this.keyWritables[t] = this.nextKeyWritables[t];
+    this.nextKeyWritables[t] = null;
+    RowContainer<List<Object>> oldRowContainer = this.candidateStorage[t];
+    oldRowContainer.clearRows();
+    this.candidateStorage[t] = this.nextGroupStorage[t];
+    this.nextGroupStorage[t] = oldRowContainer;
+  }
+
+  private boolean processKey(byte alias, List<Object> key) throws HiveException {
+    List<Object> keyWritable = keyWritables[alias];
+    if (keyWritable == null) {
+      // the first group.
+      keyWritables[alias] = key;
+      return false;
+    } else {
+      int cmp = compareKeys(key, keyWritable);
+      if (cmp != 0) {
+        nextKeyWritables[alias] = key;
+        return true;
+      }
+      return false;
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  private int compareKeys(List<Object> k1, List<Object> k2) {
+    int ret = 0;
+
+    // join keys have difference sizes?
+    ret = k1.size() - k2.size();
+    if (ret != 0) {
+      return ret;
+    }
+
+    for (int i = 0; i < k1.size(); i++) {
+      WritableComparable key_1 = (WritableComparable) k1.get(i);
+      WritableComparable key_2 = (WritableComparable) k2.get(i);
+      if (key_1 == null && key_2 == null) {
+        return nullsafes != null && nullsafes[i] ? 0 : -1; // just return k1 is
+                                                           // smaller than k2
+      } else if (key_1 == null) {
+        return -1;
+      } else if (key_2 == null) {
+        return 1;
+      }
+      ret = WritableComparator.get(key_1.getClass()).compare(key_1, key_2);
+      if (ret != 0) {
+        return ret;
+      }
+    }
+    return ret;
+  }
+
+  @SuppressWarnings("unchecked")
+  private List<Object> mergeJoinComputeKeys(Object row, Byte alias) throws HiveException {
+    if ((joinKeysObjectInspectors != null) && (joinKeysObjectInspectors[alias] != null)) {
+      return JoinUtil.computeKeys(row, joinKeys[alias], joinKeysObjectInspectors[alias]);
+    } else {
+      row =
+          ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[alias],
+              ObjectInspectorCopyOption.WRITABLE);
+      StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[alias];
+      StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY.toString());
+      return (List<Object>) soi.getStructFieldData(row, sf);
+    }
+  }
+
+  @Override
+  public String getName() {
+    return getOperatorName();
+  }
+
+  static public String getOperatorName() {
+    return "MERGEJOIN";
+  }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.MERGEJOIN;
+  }
+
+  @Override
+  public void initializeLocalWork(Configuration hconf) throws HiveException {
+    Operator<? extends OperatorDesc> parent = null;
+
+    for (Operator<? extends OperatorDesc> parentOp : parentOperators) {
+      if (parentOp != null) {
+        parent = parentOp;
+        break;
+      }
+    }
+
+    if (parent == null) {
+      throw new HiveException("No valid parents.");
+    }
+    Map<Integer, DummyStoreOperator> dummyOps = parent.getTagToOperatorTree();
+    for (Entry<Integer, DummyStoreOperator> connectOp : dummyOps.entrySet()) {
+      parentOperators.add(connectOp.getKey(), connectOp.getValue());
+      connectOp.getValue().getChildOperators().add(this);
+    }
+    super.initializeLocalWork(hconf);
+    return;
+  }
+
+  public boolean isBigTableWork() {
+    return isBigTableWork;
+  }
+
+  public void setIsBigTableWork(boolean bigTableWork) {
+    this.isBigTableWork = bigTableWork;
+  }
+
+  public int getTagForOperator(Operator<? extends OperatorDesc> op) {
+    return originalParents.indexOf(op);
+  }
+
+  public void cloneOriginalParentsList(List<Operator<? extends OperatorDesc>> opList) {
+    originalParents.addAll(opList);
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java Wed Sep 24 07:03:35 2014
@@ -65,7 +65,7 @@ import org.apache.hadoop.hive.serde2.obj
  */
 public class DummyStoreOperator extends Operator<DummyStoreDesc> implements Serializable {
 
-  private transient InspectableObject result;
+  protected transient InspectableObject result;
 
   public DummyStoreOperator() {
     super();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Wed Sep 24 07:03:35 2014
@@ -165,7 +165,7 @@ public class FetchOperator implements Se
 
   private void setupExecContext() {
     if (hasVC || work.getSplitSample() != null) {
-      context = new ExecMapperContext();
+      context = new ExecMapperContext(job);
       if (operator != null) {
         operator.setExecContext(context);
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Wed Sep 24 07:03:35 2014
@@ -76,7 +76,7 @@ public class FilterOperator extends Oper
       statsMap.put(Counter.FILTERED, filtered_count);
       statsMap.put(Counter.PASSED, passed_count);
       conditionInspector = null;
-      ioContext = IOContext.get();
+      ioContext = IOContext.get(hconf.get(Utilities.INPUT_NAME));
     } catch (Throwable e) {
       throw new HiveException(e);
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Wed Sep 24 07:03:35 2014
@@ -171,8 +171,9 @@ public class MapJoinOperator extends Abs
 
   private void loadHashTable() throws HiveException {
 
-    if (this.getExecContext().getLocalWork() == null
-        || !this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
+    if ((this.getExecContext() != null)
+        && ((this.getExecContext().getLocalWork() == null) || (!this.getExecContext()
+            .getLocalWork().getInputFileChangeSensitive()))) {
       if (hashTblInitedOnce) {
         return;
       } else {
@@ -313,8 +314,8 @@ public class MapJoinOperator extends Abs
         tableContainer.dumpMetrics();
       }
     }
-    if ((this.getExecContext().getLocalWork() != null
-        && this.getExecContext().getLocalWork().getInputFileChangeSensitive())
+    if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null)
+        && (this.getExecContext().getLocalWork().getInputFileChangeSensitive())
         && mapJoinTables != null) {
       for (MapJoinTableContainer tableContainer : mapJoinTables) {
         if (tableContainer != null) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Wed Sep 24 07:03:35 2014
@@ -33,9 +33,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -181,7 +182,7 @@ public class MapOperator extends Operato
 
     PartitionDesc pd = ctx.partDesc;
     TableDesc td = pd.getTableDesc();
-    
+
     MapOpCtx opCtx = new MapOpCtx();
     // Use table properties in case of unpartitioned tables,
     // and the union of table properties and partition properties, with partition
@@ -205,42 +206,42 @@ public class MapOperator extends Operato
 
     opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
         partRawRowObjectInspector, opCtx.tblRawRowObjectInspector);
-    
+
     // Next check if this table has partitions and if so
     // get the list of partition names as well as allocate
     // the serdes for the partition columns
     String pcols = overlayedProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
-    
+
     if (pcols != null && pcols.length() > 0) {
       String[] partKeys = pcols.trim().split("/");
       String pcolTypes = overlayedProps
           .getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
       String[] partKeyTypes = pcolTypes.trim().split(":");
-      
+
       if (partKeys.length > partKeyTypes.length) {
           throw new HiveException("Internal error : partKeys length, " +partKeys.length +
                   " greater than partKeyTypes length, " + partKeyTypes.length);
       }
-      
+
       List<String> partNames = new ArrayList<String>(partKeys.length);
       Object[] partValues = new Object[partKeys.length];
       List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length);
-      
+
       for (int i = 0; i < partKeys.length; i++) {
         String key = partKeys[i];
         partNames.add(key);
         ObjectInspector oi = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector
             (TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]));
-        
+
         // Partitions do not exist for this table
         if (partSpec == null) {
           // for partitionless table, initialize partValue to null
           partValues[i] = null;
         } else {
-            partValues[i] = 
+            partValues[i] =
                 ObjectInspectorConverters.
                 getConverter(PrimitiveObjectInspectorFactory.
-                    javaStringObjectInspector, oi).convert(partSpec.get(key)); 
+                    javaStringObjectInspector, oi).convert(partSpec.get(key));
         }
         partObjectInspectors.add(oi);
       }
@@ -337,13 +338,8 @@ public class MapOperator extends Operato
     return tableDescOI;
   }
 
-  private boolean isPartitioned(PartitionDesc pd) {
-    return pd.getPartSpec() != null && !pd.getPartSpec().isEmpty();
-  }
-
   public void setChildren(Configuration hconf) throws HiveException {
-
-    Path fpath = IOContext.get().getInputPath();
+    Path fpath = IOContext.get(hconf.get(Utilities.INPUT_NAME)).getInputPath();
 
     boolean schemeless = fpath.toUri().getScheme() == null;
 
@@ -639,4 +635,8 @@ public class MapOperator extends Operato
     return null;
   }
 
+  @Override
+  public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
+    return MapRecordProcessor.getConnectOps();
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Wed Sep 24 07:03:35 2014
@@ -146,6 +146,7 @@ public abstract class Operator<T extends
   /**
    * Implements the getChildren function for the Node Interface.
    */
+  @Override
   public ArrayList<Node> getChildren() {
 
     if (getChildOperators() == null) {
@@ -851,6 +852,7 @@ public abstract class Operator<T extends
    *
    * @return the name of the operator
    */
+  @Override
   public String getName() {
     return getOperatorName();
   }
@@ -1061,7 +1063,7 @@ public abstract class Operator<T extends
 
     if (parents != null) {
       for (Operator<? extends OperatorDesc> parent : parents) {
-        parentClones.add((Operator<? extends OperatorDesc>)(parent.clone()));
+        parentClones.add((parent.clone()));
       }
     }
 
@@ -1082,8 +1084,8 @@ public abstract class Operator<T extends
   public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException {
     T descClone = (T) conf.clone();
     Operator<? extends OperatorDesc> ret =
-        (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild(
-            descClone, getSchema());
+        OperatorFactory.getAndMakeChild(
+        descClone, getSchema());
     return ret;
   }
 
@@ -1254,15 +1256,15 @@ public abstract class Operator<T extends
     }
     return null;
   }
-  
+
   public OpTraits getOpTraits() {
     if (conf != null) {
       return conf.getOpTraits();
     }
-    
+
     return null;
   }
-  
+
   public void setOpTraits(OpTraits metaInfo) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Setting traits ("+metaInfo+") on "+this);
@@ -1299,7 +1301,17 @@ public abstract class Operator<T extends
 
   private static class DummyOperator extends Operator {
     public DummyOperator() { super("dummy"); }
+    @Override
     public void processOp(Object row, int tag) { }
+    @Override
     public OperatorType getType() { return null; }
   }
+
+  public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
+    if ((parentOperators == null) || (parentOperators.size() == 0)) {
+      return null;
+    }
+    Map<Integer, DummyStoreOperator> dummyOps = parentOperators.get(0).getTagToOperatorTree();
+    return dummyOps;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Wed Sep 24 07:03:35 2014
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
+import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
 import org.apache.hadoop.hive.ql.plan.DemuxDesc;
 import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
 import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
@@ -115,6 +116,8 @@ public final class OperatorFactory {
         RCFileMergeOperator.class));
     opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class,
         OrcFileMergeOperator.class));
+    opvec.add(new OpTuple<CommonMergeJoinDesc>(CommonMergeJoinDesc.class,
+        CommonMergeJoinOperator.class));
   }
 
   static {

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+/**
+ * A dummy store operator same as the dummy store operator but for tez. This is required so that we
+ * don't check for tez everytime before forwarding a record. In tez records flow down from the dummy
+ * store operator in processOp phase unlike in map reduce.
+ *
+ */
+public class TezDummyStoreOperator extends DummyStoreOperator {
+
+  /**
+   * Unlike the MR counterpoint, on Tez we want processOp to forward
+   * the records.
+   */
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
+    super.processOp(row, tag);
+    forward(result.o, outputObjInspector);
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Sep 24 07:03:35 2014
@@ -88,6 +88,7 @@ import org.apache.hadoop.hive.ql.plan.Fi
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -200,6 +201,8 @@ public final class Utilities {
   public static String HADOOP_LOCAL_FS = "file:///";
   public static String MAP_PLAN_NAME = "map.xml";
   public static String REDUCE_PLAN_NAME = "reduce.xml";
+  public static String MERGE_PLAN_NAME = "merge.xml";
+  public static final String INPUT_NAME = "iocontext.input.name";
   public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class";
   public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class";
 
@@ -290,6 +293,39 @@ public final class Utilities {
     return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME);
   }
 
+  public static Path setMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, Path mrScratchDir,
+      boolean useCache) {
+    for (BaseWork baseWork : mergeJoinWork.getBaseWorkList()) {
+      setBaseWork(conf, baseWork, mrScratchDir, baseWork.getName() + MERGE_PLAN_NAME, useCache);
+      String prefixes = conf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
+      if (prefixes == null) {
+        prefixes = baseWork.getName();
+      } else {
+        prefixes = prefixes + "," + baseWork.getName();
+      }
+      conf.set(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES, prefixes);
+    }
+
+    // nothing to return
+    return null;
+  }
+
+  public static BaseWork getMergeWork(JobConf jconf) {
+    if ((jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX) == null)
+        || (jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX).isEmpty())) {
+      return null;
+    }
+    return getMergeWork(jconf, jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX));
+  }
+
+  public static BaseWork getMergeWork(JobConf jconf, String prefix) {
+    if (prefix == null || prefix.isEmpty()) {
+      return null;
+    }
+
+    return getBaseWork(jconf, prefix + MERGE_PLAN_NAME);
+  }
+
   public static void cacheBaseWork(Configuration conf, String name, BaseWork work,
       Path hiveScratchDir) {
     try {
@@ -368,6 +404,8 @@ public final class Utilities {
             throw new RuntimeException("unable to determine work from configuration ."
                 + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ;
           }
+        } else if (name.contains(MERGE_PLAN_NAME)) {
+          gWork = deserializePlan(in, MapWork.class, conf);
         }
         gWorkMap.put(path, gWork);
       } else {
@@ -600,8 +638,14 @@ public final class Utilities {
   }
 
   public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) {
+    String useName = conf.get(INPUT_NAME);
+    if (useName == null) {
+      useName = "mapreduce";
+    }
+    conf.set(INPUT_NAME, useName);
     setMapWork(conf, w.getMapWork(), hiveScratchDir, true);
     if (w.getReduceWork() != null) {
+      conf.set(INPUT_NAME, useName);
       setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true);
     }
   }
@@ -1838,7 +1882,7 @@ public final class Utilities {
 
       for (int i = 0; i < parts.length; ++i) {
         assert parts[i].isDir() : "dynamic partition " + parts[i].getPath()
-            + " is not a direcgtory";
+            + " is not a directory";
         FileStatus[] items = fs.listStatus(parts[i].getPath());
 
         // remove empty directory since DP insert should not generate empty partitions.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Wed Sep 24 07:03:35 2014
@@ -78,10 +78,11 @@ public class ExecMapper extends MapReduc
   private MapredLocalWork localWork = null;
   private boolean isLogInfoEnabled = false;
 
-  private final ExecMapperContext execContext = new ExecMapperContext();
+  private ExecMapperContext execContext = null;
 
   @Override
   public void configure(JobConf job) {
+    execContext = new ExecMapperContext(job);
     // Allocate the bean at the beginning -
     memoryMXBean = ManagementFactory.getMemoryMXBean();
     l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
@@ -292,6 +293,7 @@ public class ExecMapper extends MapReduc
       this.rp = rp;
     }
 
+    @Override
     public void func(Operator op) {
       Map<Enum<?>, Long> opStats = op.getStats();
       for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java Wed Sep 24 07:03:35 2014
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.mapred.JobConf;
@@ -60,8 +61,9 @@ public class ExecMapperContext {
     this.currentBigBucketFile = currentBigBucketFile;
   }
 
-  public ExecMapperContext() {
-    ioCxt = IOContext.get();
+  public ExecMapperContext(JobConf jc) {
+    this.jc = jc;
+    ioCxt = IOContext.get(jc.get(Utilities.INPUT_NAME));
   }
 
   public void clear() {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Wed Sep 24 07:03:35 2014
@@ -91,7 +91,7 @@ public class MapredLocalTask extends Tas
 
   // not sure we need this exec context; but all the operators in the work
   // will pass this context throught
-  private ExecMapperContext execContext = new ExecMapperContext();
+  private ExecMapperContext execContext = null;
 
   private Process executor;
 
@@ -113,6 +113,7 @@ public class MapredLocalTask extends Tas
   public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
     super.initialize(conf, queryPlan, driverContext);
     job = new JobConf(conf, ExecDriver.class);
+    execContext = new ExecMapperContext(job);
     //we don't use the HadoopJobExecHooks for local tasks
     this.jobExecHelper = new HadoopJobExecHelper(job, console, this, null);
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Wed Sep 24 07:03:35 2014
@@ -31,7 +31,10 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.FileSplit;
@@ -79,9 +82,14 @@ public class CustomPartitionVertex exten
   private List<InputDataInformationEvent> dataInformationEvents;
   private int numBuckets = -1;
   private Configuration conf = null;
-  private boolean rootVertexInitialized = false;
   private final SplitGrouper grouper = new SplitGrouper();
   private int taskCount = 0;
+  private VertexType vertexType;
+  private String mainWorkName;
+  private final Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create();
+
+  private final Map<String, Multimap<Integer, InputSplit>> inputToGroupedSplitMap =
+      new HashMap<String, Multimap<Integer, InputSplit>>();
 
   public CustomPartitionVertex(VertexManagerPluginContext context) {
     super(context);
@@ -90,8 +98,18 @@ public class CustomPartitionVertex exten
   @Override
   public void initialize() {
     this.context = getContext();
-    ByteBuffer byteBuf = context.getUserPayload().getPayload();
-    this.numBuckets = byteBuf.getInt();
+    ByteBuffer payload = context.getUserPayload().getPayload();
+    CustomVertexConfiguration vertexConf = new CustomVertexConfiguration();
+    DataInputByteBuffer dibb = new DataInputByteBuffer();
+    dibb.reset(payload);
+    try {
+      vertexConf.readFields(dibb);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.numBuckets = vertexConf.getNumBuckets();
+    this.mainWorkName = vertexConf.getInputName();
+    this.vertexType = vertexConf.getVertexType();
   }
 
   @Override
@@ -113,17 +131,12 @@ public class CustomPartitionVertex exten
   public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
   }
 
-  // One call per root Input - and for now only one is handled.
+  // One call per root Input
   @Override
   public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
       List<Event> events) {
+    LOG.info("On root vertex initialized " + inputName);
 
-    // Ideally, since there's only 1 Input expected at the moment -
-    // ensure this method is called only once. Tez will call it once per Root
-    // Input.
-    Preconditions.checkState(rootVertexInitialized == false);
-    LOG.info("Root vertex not initialized");
-    rootVertexInitialized = true;
     try {
       // This is using the payload from the RootVertexInitializer corresponding
       // to InputName. Ideally it should be using it's own configuration class -
@@ -164,9 +177,6 @@ public class CustomPartitionVertex exten
         // No tasks should have been started yet. Checked by initial state
         // check.
         Preconditions.checkState(dataInformationEventSeen == false);
-        Preconditions
-            .checkState(context.getVertexNumTasks(context.getVertexName()) == -1,
-                "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism");
         InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event;
 
         // The vertex cannot be configured until all DataEvents are seen - to
@@ -220,21 +230,55 @@ public class CustomPartitionVertex exten
             (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
         Multimap<Integer, InputSplit> groupedSplit =
             HiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
-            availableSlots);
+                availableSlots, inputName);
         bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
       }
 
-      LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap.size() + " tasks");
-      processAllEvents(inputName, bucketToGroupedSplitMap);
+      LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap);
+      if ((mainWorkName.isEmpty() == false) && (mainWorkName.compareTo(inputName) != 0)) {
+        /*
+         * this is the small table side. In case of SMB join, we may need to send each split to the
+         * corresponding bucket-based task on the other side. In case a split needs to go to
+         * multiple downstream tasks, we need to clone the event and send it to the right
+         * destination.
+         */
+        processAllSideEvents(inputName, bucketToGroupedSplitMap);
+      } else {
+        processAllEvents(inputName, bucketToGroupedSplitMap);
+      }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
+  private void processAllSideEvents(String inputName,
+      Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
+    // the bucket to task map should have been setup by the big table.
+    if (bucketToTaskMap.isEmpty()) {
+      inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap);
+      return;
+    }
+    List<InputDataInformationEvent> taskEvents = new ArrayList<InputDataInformationEvent>();
+    for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
+      Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey());
+      for (Integer task : destTasks) {
+        for (InputSplit split : entry.getValue()) {
+          MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split);
+          InputDataInformationEvent diEvent =
+              InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit
+                  .toByteString().asReadOnlyByteBuffer());
+          diEvent.setTargetIndex(task);
+          taskEvents.add(diEvent);
+        }
+      }
+    }
+
+    context.addRootInputEvents(inputName, taskEvents);
+  }
+
   private void processAllEvents(String inputName,
       Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
 
-    Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create();
     List<InputSplit> finalSplits = Lists.newLinkedList();
     for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
       int bucketNum = entry.getKey();
@@ -248,11 +292,13 @@ public class CustomPartitionVertex exten
 
     // Construct the EdgeManager descriptor to be used by all edges which need
     // the routing table.
-    EdgeManagerPluginDescriptor hiveEdgeManagerDesc =
-        EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
-    UserPayload payload = getBytePayload(bucketToTaskMap);
-    hiveEdgeManagerDesc.setUserPayload(payload);
-
+    EdgeManagerPluginDescriptor hiveEdgeManagerDesc = null;
+    if ((vertexType == VertexType.MULTI_INPUT_INITIALIZED_EDGES)
+        || (vertexType == VertexType.INITIALIZED_EDGES)) {
+      hiveEdgeManagerDesc = EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
+      UserPayload payload = getBytePayload(bucketToTaskMap);
+      hiveEdgeManagerDesc.setUserPayload(payload);
+    }
     Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
 
     // Replace the edge manager for all vertices which have routing type custom.
@@ -285,13 +331,21 @@ public class CustomPartitionVertex exten
     rootInputSpecUpdate.put(
         inputName,
         InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
-    context.setVertexParallelism(
-        taskCount,
-        VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
-            .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
+    if ((mainWorkName.compareTo(inputName) == 0) || (mainWorkName.isEmpty())) {
+      context.setVertexParallelism(
+          taskCount,
+          VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
+              .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
+    }
 
     // Set the actual events for the tasks.
     context.addRootInputEvents(inputName, taskEvents);
+    if (inputToGroupedSplitMap.isEmpty() == false) {
+      for (Entry<String, Multimap<Integer, InputSplit>> entry : inputToGroupedSplitMap.entrySet()) {
+        processAllSideEvents(entry.getKey(), entry.getValue());
+      }
+      inputToGroupedSplitMap.clear();
+    }
   }
 
   UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
@@ -315,7 +369,8 @@ public class CustomPartitionVertex exten
 
     if (!(inputSplit instanceof FileSplit)) {
       throw new UnsupportedOperationException(
-          "Cannot handle splits other than FileSplit for the moment");
+          "Cannot handle splits other than FileSplit for the moment. Current input split type: "
+              + inputSplit.getClass().getSimpleName());
     }
     return (FileSplit) inputSplit;
   }
@@ -327,7 +382,6 @@ public class CustomPartitionVertex exten
       Map<String, List<FileSplit>> pathFileSplitsMap) {
 
     int bucketNum = 0;
-    int fsCount = 0;
 
     Multimap<Integer, InputSplit> bucketToInitialSplitMap =
         ArrayListMultimap.<Integer, InputSplit> create();
@@ -335,14 +389,20 @@ public class CustomPartitionVertex exten
     for (Map.Entry<String, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
       int bucketId = bucketNum % numBuckets;
       for (FileSplit fsplit : entry.getValue()) {
-        fsCount++;
         bucketToInitialSplitMap.put(bucketId, fsplit);
       }
       bucketNum++;
     }
 
-    LOG.info("Total number of splits counted: " + fsCount + " and total files encountered: "
-        + pathFileSplitsMap.size());
+    if (bucketNum < numBuckets) {
+      int loopedBucketId = 0;
+      for (; bucketNum < numBuckets; bucketNum++) {
+        for (InputSplit fsplit : bucketToInitialSplitMap.get(loopedBucketId)) {
+          bucketToInitialSplitMap.put(bucketNum, fsplit);
+        }
+        loopedBucketId++;
+      }
+    }
 
     return bucketToInitialSplitMap;
   }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
+import org.apache.hadoop.io.Writable;
+
+/*
+ * This class is the payload for custom vertex. It serializes and de-serializes
+ * @numBuckets: the number of buckets of the "big table"
+ * @vertexType: this is the type of vertex and differentiates between bucket map join and SMB joins
+ * @inputName: This is the name of the input. Used in case of SMB joins
+ */
+public class CustomVertexConfiguration implements Writable {
+
+  private int numBuckets;
+  private VertexType vertexType = VertexType.AUTO_INITIALIZED_EDGES;
+  private String inputName;
+
+  public CustomVertexConfiguration() {
+  }
+
+  public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName) {
+    this.numBuckets = numBuckets;
+    this.vertexType = vertexType;
+    this.inputName = inputName;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(this.vertexType.ordinal());
+    out.writeInt(this.numBuckets);
+    out.writeUTF(inputName);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.vertexType = VertexType.values()[in.readInt()];
+    this.numBuckets = in.readInt();
+    this.inputName = in.readUTF();
+  }
+
+  public int getNumBuckets() {
+    return numBuckets;
+  }
+
+  public VertexType getVertexType() {
+    return vertexType;
+  }
+
+  public String getInputName() {
+    return inputName;
+  }
+}