You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/24 09:03:38 UTC
svn commit: r1627235 [1/9] - in /hive/trunk: itests/src/test/resources/
itests/util/src/main/java/org/apache/hadoop/hive/ql/
metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/
ql/if/ ql/src/gen/thrift/gen-cpp/ ql/src/gen/thrift...
Author: gunther
Date: Wed Sep 24 07:03:35 2014
New Revision: 1627235
URL: http://svn.apache.org/r1627235
Log:
HIVE-7482: The execution side changes for SMB join in hive-tez (Vikram Dixit K via Gunther Hagleitner)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordSource.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java
hive/trunk/ql/src/test/queries/clientpositive/tez_smb_1.q
hive/trunk/ql/src/test/queries/clientpositive/tez_smb_main.q
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_1.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_10.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_11.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_12.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_13.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_14.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_15.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_16.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_2.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_3.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_4.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_5.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_6.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_7.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_8.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/auto_sortmerge_join_9.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/tez_smb_1.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/tez_smb_main.q.out
Removed:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java
Modified:
hive/trunk/itests/src/test/resources/testconfiguration.properties
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FieldSchema.java
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Partition.java
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/SerDeInfo.java
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java
hive/trunk/ql/if/queryplan.thrift
hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h
hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
hive/trunk/ql/src/gen/thrift/gen-php/Types.php
hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
hive/trunk/ql/src/test/results/clientpositive/tez/correlationoptimizer1.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/cross_join.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/cross_product_check_1.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/filter_join_breaktask.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/join0.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/join1.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/limit_pushdown.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/metadataonly1.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/mrr.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/optimize_nullscan.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/subquery_exists.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/subquery_in.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/tez_join_hash.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/tez_join_tests.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/tez_joins_explain.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/tez_union.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/vectorized_dynamic_partition_pruning.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/vectorized_shufflejoin.q.out
Modified: hive/trunk/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/trunk/itests/src/test/resources/testconfiguration.properties?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/trunk/itests/src/test/resources/testconfiguration.properties Wed Sep 24 07:03:35 2014
@@ -171,7 +171,22 @@ minitez.query.files.shared=alter_merge_2
vectorized_nested_mapjoin.q,\
vectorized_ptf.q,\
vectorized_shufflejoin.q,\
- vectorized_timestamp_funcs.q
+ vectorized_timestamp_funcs.q,\
+ auto_sortmerge_join_1.q,\
+ auto_sortmerge_join_10.q,\
+ auto_sortmerge_join_11.q,\
+ auto_sortmerge_join_12.q,\
+ auto_sortmerge_join_13.q,\
+ auto_sortmerge_join_14.q,\
+ auto_sortmerge_join_15.q,\
+ auto_sortmerge_join_16.q,\
+ auto_sortmerge_join_2.q,\
+ auto_sortmerge_join_3.q,\
+ auto_sortmerge_join_4.q,\
+ auto_sortmerge_join_5.q,\
+ auto_sortmerge_join_7.q,\
+ auto_sortmerge_join_8.q,\
+ auto_sortmerge_join_9.q
minitez.query.files=bucket_map_join_tez1.q,\
bucket_map_join_tez2.q,\
@@ -189,6 +204,8 @@ minitez.query.files=bucket_map_join_tez1
tez_schema_evolution.q,\
tez_union.q,\
tez_union_decimal.q,\
+ tez_smb_main.q,\
+ tez_smb_1.q,\
vectorized_dynamic_partition_pruning.q
beeline.positive.exclude=add_part_exist.q,\
Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java Wed Sep 24 07:03:35 2014
@@ -690,7 +690,10 @@ public class QTestUtil {
// conf.logVars(System.out);
// System.out.flush();
+ String execEngine = conf.get("hive.execution.engine");
+ conf.set("hive.execution.engine", "mr");
SessionState.start(conf);
+ conf.set("hive.execution.engine", execEngine);
db = Hive.get(conf);
fs = FileSystem.get(conf);
drv = new Driver(conf);
@@ -771,6 +774,8 @@ public class QTestUtil {
HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER,
"org.apache.hadoop.hive.ql.security.DummyAuthenticator");
+ String execEngine = conf.get("hive.execution.engine");
+ conf.set("hive.execution.engine", "mr");
CliSessionState ss = new CliSessionState(conf);
assert ss != null;
ss.in = System.in;
@@ -788,6 +793,7 @@ public class QTestUtil {
isSessionStateStarted = true;
+ conf.set("hive.execution.engine", execEngine);
return ss;
}
Modified: hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FieldSchema.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FieldSchema.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FieldSchema.java (original)
+++ hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FieldSchema.java Wed Sep 24 07:03:35 2014
@@ -135,9 +135,9 @@ public class FieldSchema implements org.
String comment)
{
this();
- this.name = org.apache.hive.common.util.HiveStringUtils.intern(name);
- this.type = org.apache.hive.common.util.HiveStringUtils.intern(type);
- this.comment = org.apache.hive.common.util.HiveStringUtils.intern(comment);
+ this.name = name;
+ this.type = type;
+ this.comment = comment;
}
/**
@@ -145,13 +145,13 @@ public class FieldSchema implements org.
*/
public FieldSchema(FieldSchema other) {
if (other.isSetName()) {
- this.name = org.apache.hive.common.util.HiveStringUtils.intern(other.name);
+ this.name = other.name;
}
if (other.isSetType()) {
- this.type = org.apache.hive.common.util.HiveStringUtils.intern(other.type);
+ this.type = other.type;
}
if (other.isSetComment()) {
- this.comment = org.apache.hive.common.util.HiveStringUtils.intern(other.comment);
+ this.comment = other.comment;
}
}
@@ -171,7 +171,7 @@ public class FieldSchema implements org.
}
public void setName(String name) {
- this.name = org.apache.hive.common.util.HiveStringUtils.intern(name);
+ this.name = name;
}
public void unsetName() {
@@ -194,7 +194,7 @@ public class FieldSchema implements org.
}
public void setType(String type) {
- this.type = org.apache.hive.common.util.HiveStringUtils.intern(type);
+ this.type = type;
}
public void unsetType() {
@@ -217,7 +217,7 @@ public class FieldSchema implements org.
}
public void setComment(String comment) {
- this.comment = org.apache.hive.common.util.HiveStringUtils.intern(comment);
+ this.comment = comment;
}
public void unsetComment() {
Modified: hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Partition.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Partition.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Partition.java (original)
+++ hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Partition.java Wed Sep 24 07:03:35 2014
@@ -182,14 +182,14 @@ public class Partition implements org.ap
{
this();
this.values = values;
- this.dbName = org.apache.hive.common.util.HiveStringUtils.intern(dbName);
- this.tableName = org.apache.hive.common.util.HiveStringUtils.intern(tableName);
+ this.dbName = dbName;
+ this.tableName = tableName;
this.createTime = createTime;
setCreateTimeIsSet(true);
this.lastAccessTime = lastAccessTime;
setLastAccessTimeIsSet(true);
this.sd = sd;
- this.parameters = org.apache.hive.common.util.HiveStringUtils.intern(parameters);
+ this.parameters = parameters;
}
/**
@@ -205,10 +205,10 @@ public class Partition implements org.ap
this.values = __this__values;
}
if (other.isSetDbName()) {
- this.dbName = org.apache.hive.common.util.HiveStringUtils.intern(other.dbName);
+ this.dbName = other.dbName;
}
if (other.isSetTableName()) {
- this.tableName = org.apache.hive.common.util.HiveStringUtils.intern(other.tableName);
+ this.tableName = other.tableName;
}
this.createTime = other.createTime;
this.lastAccessTime = other.lastAccessTime;
@@ -222,9 +222,9 @@ public class Partition implements org.ap
String other_element_key = other_element.getKey();
String other_element_value = other_element.getValue();
- String __this__parameters_copy_key = org.apache.hive.common.util.HiveStringUtils.intern(other_element_key);
+ String __this__parameters_copy_key = other_element_key;
- String __this__parameters_copy_value = org.apache.hive.common.util.HiveStringUtils.intern(other_element_value);
+ String __this__parameters_copy_value = other_element_value;
__this__parameters.put(__this__parameters_copy_key, __this__parameters_copy_value);
}
@@ -296,7 +296,7 @@ public class Partition implements org.ap
}
public void setDbName(String dbName) {
- this.dbName = org.apache.hive.common.util.HiveStringUtils.intern(dbName);
+ this.dbName = dbName;
}
public void unsetDbName() {
@@ -319,7 +319,7 @@ public class Partition implements org.ap
}
public void setTableName(String tableName) {
- this.tableName = org.apache.hive.common.util.HiveStringUtils.intern(tableName);
+ this.tableName = tableName;
}
public void unsetTableName() {
@@ -420,7 +420,7 @@ public class Partition implements org.ap
}
public void setParameters(Map<String,String> parameters) {
- this.parameters = org.apache.hive.common.util.HiveStringUtils.intern(parameters);
+ this.parameters = parameters;
}
public void unsetParameters() {
Modified: hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/SerDeInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/SerDeInfo.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/SerDeInfo.java (original)
+++ hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/SerDeInfo.java Wed Sep 24 07:03:35 2014
@@ -137,9 +137,9 @@ public class SerDeInfo implements org.ap
Map<String,String> parameters)
{
this();
- this.name = org.apache.hive.common.util.HiveStringUtils.intern(name);
- this.serializationLib = org.apache.hive.common.util.HiveStringUtils.intern(serializationLib);
- this.parameters = org.apache.hive.common.util.HiveStringUtils.intern(parameters);
+ this.name = name;
+ this.serializationLib = serializationLib;
+ this.parameters = parameters;
}
/**
@@ -147,10 +147,10 @@ public class SerDeInfo implements org.ap
*/
public SerDeInfo(SerDeInfo other) {
if (other.isSetName()) {
- this.name = org.apache.hive.common.util.HiveStringUtils.intern(other.name);
+ this.name = other.name;
}
if (other.isSetSerializationLib()) {
- this.serializationLib = org.apache.hive.common.util.HiveStringUtils.intern(other.serializationLib);
+ this.serializationLib = other.serializationLib;
}
if (other.isSetParameters()) {
Map<String,String> __this__parameters = new HashMap<String,String>();
@@ -159,9 +159,9 @@ public class SerDeInfo implements org.ap
String other_element_key = other_element.getKey();
String other_element_value = other_element.getValue();
- String __this__parameters_copy_key = org.apache.hive.common.util.HiveStringUtils.intern(other_element_key);
+ String __this__parameters_copy_key = other_element_key;
- String __this__parameters_copy_value = org.apache.hive.common.util.HiveStringUtils.intern(other_element_value);
+ String __this__parameters_copy_value = other_element_value;
__this__parameters.put(__this__parameters_copy_key, __this__parameters_copy_value);
}
@@ -185,7 +185,7 @@ public class SerDeInfo implements org.ap
}
public void setName(String name) {
- this.name = org.apache.hive.common.util.HiveStringUtils.intern(name);
+ this.name = name;
}
public void unsetName() {
@@ -208,7 +208,7 @@ public class SerDeInfo implements org.ap
}
public void setSerializationLib(String serializationLib) {
- this.serializationLib = org.apache.hive.common.util.HiveStringUtils.intern(serializationLib);
+ this.serializationLib = serializationLib;
}
public void unsetSerializationLib() {
@@ -242,7 +242,7 @@ public class SerDeInfo implements org.ap
}
public void setParameters(Map<String,String> parameters) {
- this.parameters = org.apache.hive.common.util.HiveStringUtils.intern(parameters);
+ this.parameters = parameters;
}
public void unsetParameters() {
Modified: hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java (original)
+++ hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java Wed Sep 24 07:03:35 2014
@@ -216,17 +216,17 @@ public class StorageDescriptor implement
{
this();
this.cols = cols;
- this.location = org.apache.hive.common.util.HiveStringUtils.intern(location);
- this.inputFormat = org.apache.hive.common.util.HiveStringUtils.intern(inputFormat);
- this.outputFormat = org.apache.hive.common.util.HiveStringUtils.intern(outputFormat);
+ this.location = location;
+ this.inputFormat = inputFormat;
+ this.outputFormat = outputFormat;
this.compressed = compressed;
setCompressedIsSet(true);
this.numBuckets = numBuckets;
setNumBucketsIsSet(true);
this.serdeInfo = serdeInfo;
- this.bucketCols = org.apache.hive.common.util.HiveStringUtils.intern(bucketCols);
+ this.bucketCols = bucketCols;
this.sortCols = sortCols;
- this.parameters = org.apache.hive.common.util.HiveStringUtils.intern(parameters);
+ this.parameters = parameters;
}
/**
@@ -242,13 +242,13 @@ public class StorageDescriptor implement
this.cols = __this__cols;
}
if (other.isSetLocation()) {
- this.location = org.apache.hive.common.util.HiveStringUtils.intern(other.location);
+ this.location = other.location;
}
if (other.isSetInputFormat()) {
- this.inputFormat = org.apache.hive.common.util.HiveStringUtils.intern(other.inputFormat);
+ this.inputFormat = other.inputFormat;
}
if (other.isSetOutputFormat()) {
- this.outputFormat = org.apache.hive.common.util.HiveStringUtils.intern(other.outputFormat);
+ this.outputFormat = other.outputFormat;
}
this.compressed = other.compressed;
this.numBuckets = other.numBuckets;
@@ -276,9 +276,9 @@ public class StorageDescriptor implement
String other_element_key = other_element.getKey();
String other_element_value = other_element.getValue();
- String __this__parameters_copy_key = org.apache.hive.common.util.HiveStringUtils.intern(other_element_key);
+ String __this__parameters_copy_key = other_element_key;
- String __this__parameters_copy_value = org.apache.hive.common.util.HiveStringUtils.intern(other_element_value);
+ String __this__parameters_copy_value = other_element_value;
__this__parameters.put(__this__parameters_copy_key, __this__parameters_copy_value);
}
@@ -356,7 +356,7 @@ public class StorageDescriptor implement
}
public void setLocation(String location) {
- this.location = org.apache.hive.common.util.HiveStringUtils.intern(location);
+ this.location = location;
}
public void unsetLocation() {
@@ -379,7 +379,7 @@ public class StorageDescriptor implement
}
public void setInputFormat(String inputFormat) {
- this.inputFormat = org.apache.hive.common.util.HiveStringUtils.intern(inputFormat);
+ this.inputFormat = inputFormat;
}
public void unsetInputFormat() {
@@ -402,7 +402,7 @@ public class StorageDescriptor implement
}
public void setOutputFormat(String outputFormat) {
- this.outputFormat = org.apache.hive.common.util.HiveStringUtils.intern(outputFormat);
+ this.outputFormat = outputFormat;
}
public void unsetOutputFormat() {
@@ -507,7 +507,7 @@ public class StorageDescriptor implement
}
public void setBucketCols(List<String> bucketCols) {
- this.bucketCols = org.apache.hive.common.util.HiveStringUtils.intern(bucketCols);
+ this.bucketCols = bucketCols;
}
public void unsetBucketCols() {
@@ -579,7 +579,7 @@ public class StorageDescriptor implement
}
public void setParameters(Map<String,String> parameters) {
- this.parameters = org.apache.hive.common.util.HiveStringUtils.intern(parameters);
+ this.parameters = parameters;
}
public void unsetParameters() {
Modified: hive/trunk/ql/if/queryplan.thrift
URL: http://svn.apache.org/viewvc/hive/trunk/ql/if/queryplan.thrift?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/if/queryplan.thrift (original)
+++ hive/trunk/ql/if/queryplan.thrift Wed Sep 24 07:03:35 2014
@@ -59,6 +59,7 @@ enum OperatorType {
EVENT,
ORCFILEMERGE,
RCFILEMERGE,
+ MERGEJOIN,
}
struct Operator {
Modified: hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp (original)
+++ hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp Wed Sep 24 07:03:35 2014
@@ -54,7 +54,8 @@ int _kOperatorTypeValues[] = {
OperatorType::DEMUX,
OperatorType::EVENT,
OperatorType::ORCFILEMERGE,
- OperatorType::RCFILEMERGE
+ OperatorType::RCFILEMERGE,
+ OperatorType::MERGEJOIN
};
const char* _kOperatorTypeNames[] = {
"JOIN",
@@ -80,9 +81,10 @@ const char* _kOperatorTypeNames[] = {
"DEMUX",
"EVENT",
"ORCFILEMERGE",
- "RCFILEMERGE"
+ "RCFILEMERGE",
+ "MERGEJOIN"
};
-const std::map<int, const char*> _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(24, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(25, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
int _kTaskTypeValues[] = {
TaskType::MAP,
Modified: hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h (original)
+++ hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h Wed Sep 24 07:03:35 2014
@@ -59,7 +59,8 @@ struct OperatorType {
DEMUX = 20,
EVENT = 21,
ORCFILEMERGE = 22,
- RCFILEMERGE = 23
+ RCFILEMERGE = 23,
+ MERGEJOIN = 24
};
};
Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java Wed Sep 24 07:03:35 2014
@@ -35,7 +35,8 @@ public enum OperatorType implements org.
DEMUX(20),
EVENT(21),
ORCFILEMERGE(22),
- RCFILEMERGE(23);
+ RCFILEMERGE(23),
+ MERGEJOIN(24);
private final int value;
@@ -104,6 +105,8 @@ public enum OperatorType implements org.
return ORCFILEMERGE;
case 23:
return RCFILEMERGE;
+ case 24:
+ return MERGEJOIN;
default:
return null;
}
Modified: hive/trunk/ql/src/gen/thrift/gen-php/Types.php
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-php/Types.php?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-php/Types.php (original)
+++ hive/trunk/ql/src/gen/thrift/gen-php/Types.php Wed Sep 24 07:03:35 2014
@@ -59,6 +59,7 @@ final class OperatorType {
const EVENT = 21;
const ORCFILEMERGE = 22;
const RCFILEMERGE = 23;
+ const MERGEJOIN = 24;
static public $__names = array(
0 => 'JOIN',
1 => 'MAPJOIN',
@@ -84,6 +85,7 @@ final class OperatorType {
21 => 'EVENT',
22 => 'ORCFILEMERGE',
23 => 'RCFILEMERGE',
+ 24 => 'MERGEJOIN',
);
}
Modified: hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py (original)
+++ hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py Wed Sep 24 07:03:35 2014
@@ -69,6 +69,7 @@ class OperatorType:
EVENT = 21
ORCFILEMERGE = 22
RCFILEMERGE = 23
+ MERGEJOIN = 24
_VALUES_TO_NAMES = {
0: "JOIN",
@@ -95,6 +96,7 @@ class OperatorType:
21: "EVENT",
22: "ORCFILEMERGE",
23: "RCFILEMERGE",
+ 24: "MERGEJOIN",
}
_NAMES_TO_VALUES = {
@@ -122,6 +124,7 @@ class OperatorType:
"EVENT": 21,
"ORCFILEMERGE": 22,
"RCFILEMERGE": 23,
+ "MERGEJOIN": 24,
}
class TaskType:
Modified: hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb (original)
+++ hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb Wed Sep 24 07:03:35 2014
@@ -45,8 +45,9 @@ module OperatorType
EVENT = 21
ORCFILEMERGE = 22
RCFILEMERGE = 23
- VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "EVENT", 22 => "ORCFILEMERGE", 23 => "RCFILEMERGE"}
- VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX, EVENT, ORCFILEMERGE, RCFILEMERGE]).freeze
+ MERGEJOIN = 24
+ VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "EVENT", 22 => "ORCFILEMERGE", 23 => "RCFILEMERGE", 24 => "MERGEJOIN"}
+ VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX, EVENT, ORCFILEMERGE, RCFILEMERGE, MERGEJOIN]).freeze
end
module TaskType
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Wed Sep 24 07:03:35 2014
@@ -61,13 +61,13 @@ public abstract class AbstractMapJoinOpe
@Override
@SuppressWarnings("unchecked")
protected void initializeOp(Configuration hconf) throws HiveException {
- int tagLen = conf.getTagLength();
-
- joinKeys = new List[tagLen];
-
- JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), NOTSKIPBIGTABLE);
- joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys,
- inputObjInspectors,NOTSKIPBIGTABLE, tagLen);
+ if (conf.getGenJoinKeys()) {
+ int tagLen = conf.getTagLength();
+ joinKeys = new List[tagLen];
+ JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), NOTSKIPBIGTABLE);
+ joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys,
+ inputObjInspectors,NOTSKIPBIGTABLE, tagLen);
+ }
super.initializeOp(hconf);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Wed Sep 24 07:03:35 2014
@@ -323,7 +323,6 @@ public abstract class CommonJoinOperator
@Override
public void startGroup() throws HiveException {
- LOG.trace("Join: Starting new group");
newGroupStarted = true;
for (AbstractRowContainer<List<Object>> alw : storage) {
alw.clearRows();
@@ -632,8 +631,6 @@ public abstract class CommonJoinOperator
*/
@Override
public void endGroup() throws HiveException {
- LOG.trace("Join Op: endGroup called: numValues=" + numAliases);
-
checkAndGenObject();
}
@@ -719,7 +716,6 @@ public abstract class CommonJoinOperator
if (noOuterJoin) {
if (alw.rowCount() == 0) {
- LOG.trace("No data for alias=" + i);
return;
} else if (alw.rowCount() > 1) {
mayHasMoreThanOne = true;
@@ -776,7 +772,6 @@ public abstract class CommonJoinOperator
*/
@Override
public void closeOp(boolean abort) throws HiveException {
- LOG.trace("Join Op close");
for (AbstractRowContainer<List<Object>> alw : storage) {
if (alw != null) {
alw.clearRows(); // clean up the temp files
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,507 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
+import org.apache.hadoop.hive.ql.exec.tez.RecordSource;
+import org.apache.hadoop.hive.ql.exec.tez.TezContext;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+/*
+ * With an aim to consolidate the join algorithms to either hash based joins (MapJoinOperator) or
+ * sort-merge based joins, this operator is being introduced. This operator executes a sort-merge
+ * based algorithm. It replaces both the JoinOperator and the SMBMapJoinOperator for the tez side of
+ * things. It works in either the map phase or reduce phase.
+ *
+ * The basic algorithm is as follows:
+ *
+ * 1. The processOp receives a row from a "big" table.
+ * 2. In order to process it, the operator does a fetch for rows from the other tables.
+ * 3. Once we have a set of rows from the other tables (till we hit a new key), more rows are
+ * brought in from the big table and a join is performed.
+ */
+
+public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMergeJoinDesc> implements
+ Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private boolean isBigTableWork;
+ private static final Log LOG = LogFactory.getLog(CommonMergeJoinOperator.class.getName());
+ private Map<Integer, String> aliasToInputNameMap;
+ transient List<Object>[] keyWritables;
+ transient List<Object>[] nextKeyWritables;
+ transient RowContainer<List<Object>>[] nextGroupStorage;
+ transient RowContainer<List<Object>>[] candidateStorage;
+
+ transient String[] tagToAlias;
+ private transient boolean[] fetchDone;
+ private transient boolean[] foundNextKeyGroup;
+ transient boolean firstFetchHappened = false;
+ transient boolean localWorkInited = false;
+ transient boolean initDone = false;
+ transient List<Object> otherKey = null;
+ transient List<Object> values = null;
+ transient RecordSource[] sources;
+ transient List<Operator<? extends OperatorDesc>> originalParents =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+
+ public CommonMergeJoinOperator() {
+ super();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void initializeOp(Configuration hconf) throws HiveException {
+ super.initializeOp(hconf);
+ firstFetchHappened = false;
+ initializeChildren(hconf);
+ int maxAlias = 0;
+ for (byte pos = 0; pos < order.length; pos++) {
+ if (pos > maxAlias) {
+ maxAlias = pos;
+ }
+ }
+ maxAlias += 1;
+
+ nextGroupStorage = new RowContainer[maxAlias];
+ candidateStorage = new RowContainer[maxAlias];
+ keyWritables = new ArrayList[maxAlias];
+ nextKeyWritables = new ArrayList[maxAlias];
+ fetchDone = new boolean[maxAlias];
+ foundNextKeyGroup = new boolean[maxAlias];
+
+ int bucketSize;
+
+ int oldVar = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINBUCKETCACHESIZE);
+ if (oldVar != 100) {
+ bucketSize = oldVar;
+ } else {
+ bucketSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESMBJOINCACHEROWS);
+ }
+
+ for (byte pos = 0; pos < order.length; pos++) {
+ RowContainer<List<Object>> rc =
+ JoinUtil.getRowContainer(hconf, rowContainerStandardObjectInspectors[pos], pos,
+ bucketSize, spillTableDesc, conf, !hasFilter(pos), reporter);
+ nextGroupStorage[pos] = rc;
+ RowContainer<List<Object>> candidateRC =
+ JoinUtil.getRowContainer(hconf, rowContainerStandardObjectInspectors[pos], pos,
+ bucketSize, spillTableDesc, conf, !hasFilter(pos), reporter);
+ candidateStorage[pos] = candidateRC;
+ }
+
+ for (byte pos = 0; pos < order.length; pos++) {
+ if (pos != posBigTable) {
+ fetchDone[pos] = false;
+ }
+ foundNextKeyGroup[pos] = false;
+ }
+
+ sources = ((TezContext) MapredContext.get()).getRecordSources();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hive.ql.exec.Operator#processOp(java.lang.Object,
+ * int) this processor has a push-pull model. First call to this method is a
+ * push but the rest is pulled until we run out of records.
+ */
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ posBigTable = (byte) conf.getBigTablePosition();
+
+ byte alias = (byte) tag;
+ List<Object> value = getFilteredValue(alias, row);
+ // compute keys and values as StandardObjects
+ List<Object> key = mergeJoinComputeKeys(row, alias);
+
+ if (!firstFetchHappened) {
+ firstFetchHappened = true;
+ // fetch the first group for all small table aliases
+ for (byte pos = 0; pos < order.length; pos++) {
+ if (pos != posBigTable) {
+ fetchNextGroup(pos);
+ }
+ }
+ }
+
+ //have we reached a new key group?
+ boolean nextKeyGroup = processKey(alias, key);
+ if (nextKeyGroup) {
+ //assert this.nextGroupStorage[alias].size() == 0;
+ this.nextGroupStorage[alias].addRow(value);
+ foundNextKeyGroup[tag] = true;
+ if (tag != posBigTable) {
+ return;
+ }
+ }
+
+ reportProgress();
+ numMapRowsRead++;
+
+ // the big table has reached a new key group. try to let the small tables
+ // catch up with the big table.
+ if (nextKeyGroup) {
+ assert tag == posBigTable;
+ List<Byte> smallestPos = null;
+ do {
+ smallestPos = joinOneGroup();
+ //jump out the loop if we need input from the big table
+ } while (smallestPos != null && smallestPos.size() > 0
+ && !smallestPos.contains(this.posBigTable));
+
+ return;
+ }
+
+ assert !nextKeyGroup;
+ candidateStorage[tag].addRow(value);
+
+ }
+
+ private List<Byte> joinOneGroup() throws HiveException {
+ int[] smallestPos = findSmallestKey();
+ List<Byte> listOfNeedFetchNext = null;
+ if (smallestPos != null) {
+ listOfNeedFetchNext = joinObject(smallestPos);
+ if (listOfNeedFetchNext.size() > 0) {
+ // listOfNeedFetchNext contains all tables that we have joined data in their
+ // candidateStorage, and we need to clear candidate storage and promote their
+ // nextGroupStorage to candidateStorage and fetch data until we reach a
+ // new group.
+ for (Byte b : listOfNeedFetchNext) {
+ try {
+ fetchNextGroup(b);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+ }
+ }
+ return listOfNeedFetchNext;
+ }
+
+ private List<Byte> joinObject(int[] smallestPos) throws HiveException {
+ List<Byte> needFetchList = new ArrayList<Byte>();
+ byte index = (byte) (smallestPos.length - 1);
+ for (; index >= 0; index--) {
+ if (smallestPos[index] > 0 || keyWritables[index] == null) {
+ putDummyOrEmpty(index);
+ continue;
+ }
+ storage[index] = candidateStorage[index];
+ needFetchList.add(index);
+ if (smallestPos[index] < 0) {
+ break;
+ }
+ }
+ for (index--; index >= 0; index--) {
+ putDummyOrEmpty(index);
+ }
+ checkAndGenObject();
+ for (Byte pos : needFetchList) {
+ this.candidateStorage[pos].clearRows();
+ this.keyWritables[pos] = null;
+ }
+ return needFetchList;
+ }
+
+ private void putDummyOrEmpty(Byte i) {
+ // put a empty list or null
+ if (noOuterJoin) {
+ storage[i] = emptyList;
+ } else {
+ storage[i] = dummyObjVectors[i];
+ }
+ }
+
+ private int[] findSmallestKey() {
+ int[] result = new int[order.length];
+ List<Object> smallestOne = null;
+
+ for (byte pos = 0; pos < order.length; pos++) {
+ List<Object> key = keyWritables[pos];
+ if (key == null) {
+ continue;
+ }
+ if (smallestOne == null) {
+ smallestOne = key;
+ result[pos] = -1;
+ continue;
+ }
+ result[pos] = compareKeys(key, smallestOne);
+ if (result[pos] < 0) {
+ smallestOne = key;
+ }
+ }
+ return smallestOne == null ? null : result;
+ }
+
+ private void fetchNextGroup(Byte t) throws HiveException {
+ if (foundNextKeyGroup[t]) {
+ // first promote the next group to be the current group if we reached a
+ // new group in the previous fetch
+ if ((this.nextKeyWritables[t] != null) || (this.fetchDone[t] == false)) {
+ promoteNextGroupToCandidate(t);
+ } else {
+ this.keyWritables[t] = null;
+ this.candidateStorage[t] = null;
+ this.nextGroupStorage[t] = null;
+ }
+ foundNextKeyGroup[t] = false;
+ }
+ // for the big table, we only need to promote the next group to the current group.
+ if (t == posBigTable) {
+ return;
+ }
+
+ // for tables other than the big table, we need to fetch more data until reach a new group or
+ // done.
+ while (!foundNextKeyGroup[t]) {
+ if (fetchDone[t]) {
+ break;
+ }
+ fetchOneRow(t);
+ }
+ if (!foundNextKeyGroup[t] && fetchDone[t]) {
+ this.nextKeyWritables[t] = null;
+ }
+ }
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ joinFinalLeftData();
+
+ // clean up
+ for (int pos = 0; pos < order.length; pos++) {
+ if (pos != posBigTable) {
+ fetchDone[pos] = false;
+ }
+ foundNextKeyGroup[pos] = false;
+ }
+ }
+
+ private void fetchOneRow(byte tag) throws HiveException {
+ try {
+ fetchDone[tag] = !sources[tag].pushRecord();
+ if (sources[tag].isGrouped()) {
+ // instead of maintaining complex state for the fetch of the next group,
+ // we know for sure that at the end of all the values for a given key,
+ // we will definitely reach the next key group.
+ foundNextKeyGroup[tag] = true;
+ }
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ private void joinFinalLeftData() throws HiveException {
+ @SuppressWarnings("rawtypes")
+ RowContainer bigTblRowContainer = this.candidateStorage[this.posBigTable];
+
+ boolean allFetchDone = allFetchDone();
+ // if all left data in small tables are less than and equal to the left data
+ // in big table, let's them catch up
+ while (bigTblRowContainer != null && bigTblRowContainer.rowCount() > 0 && !allFetchDone) {
+ joinOneGroup();
+ bigTblRowContainer = this.candidateStorage[this.posBigTable];
+ allFetchDone = allFetchDone();
+ }
+
+ while (!allFetchDone) {
+ List<Byte> ret = joinOneGroup();
+ if (ret == null || ret.size() == 0) {
+ break;
+ }
+ reportProgress();
+ numMapRowsRead++;
+ allFetchDone = allFetchDone();
+ }
+
+ boolean dataInCache = true;
+ while (dataInCache) {
+ for (byte pos = 0; pos < order.length; pos++) {
+ if (this.foundNextKeyGroup[pos] && this.nextKeyWritables[pos] != null) {
+ promoteNextGroupToCandidate(pos);
+ }
+ }
+ joinOneGroup();
+ dataInCache = false;
+ for (byte pos = 0; pos < order.length; pos++) {
+ if (this.candidateStorage[pos].rowCount() > 0) {
+ dataInCache = true;
+ break;
+ }
+ }
+ }
+ }
+
+ private boolean allFetchDone() {
+ boolean allFetchDone = true;
+ for (byte pos = 0; pos < order.length; pos++) {
+ if (pos == posBigTable) {
+ continue;
+ }
+ allFetchDone = allFetchDone && fetchDone[pos];
+ }
+ return allFetchDone;
+ }
+
+ private void promoteNextGroupToCandidate(Byte t) throws HiveException {
+ this.keyWritables[t] = this.nextKeyWritables[t];
+ this.nextKeyWritables[t] = null;
+ RowContainer<List<Object>> oldRowContainer = this.candidateStorage[t];
+ oldRowContainer.clearRows();
+ this.candidateStorage[t] = this.nextGroupStorage[t];
+ this.nextGroupStorage[t] = oldRowContainer;
+ }
+
+ private boolean processKey(byte alias, List<Object> key) throws HiveException {
+ List<Object> keyWritable = keyWritables[alias];
+ if (keyWritable == null) {
+ // the first group.
+ keyWritables[alias] = key;
+ return false;
+ } else {
+ int cmp = compareKeys(key, keyWritable);
+ if (cmp != 0) {
+ nextKeyWritables[alias] = key;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private int compareKeys(List<Object> k1, List<Object> k2) {
+ int ret = 0;
+
+ // join keys have difference sizes?
+ ret = k1.size() - k2.size();
+ if (ret != 0) {
+ return ret;
+ }
+
+ for (int i = 0; i < k1.size(); i++) {
+ WritableComparable key_1 = (WritableComparable) k1.get(i);
+ WritableComparable key_2 = (WritableComparable) k2.get(i);
+ if (key_1 == null && key_2 == null) {
+ return nullsafes != null && nullsafes[i] ? 0 : -1; // just return k1 is
+ // smaller than k2
+ } else if (key_1 == null) {
+ return -1;
+ } else if (key_2 == null) {
+ return 1;
+ }
+ ret = WritableComparator.get(key_1.getClass()).compare(key_1, key_2);
+ if (ret != 0) {
+ return ret;
+ }
+ }
+ return ret;
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<Object> mergeJoinComputeKeys(Object row, Byte alias) throws HiveException {
+ if ((joinKeysObjectInspectors != null) && (joinKeysObjectInspectors[alias] != null)) {
+ return JoinUtil.computeKeys(row, joinKeys[alias], joinKeysObjectInspectors[alias]);
+ } else {
+ row =
+ ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[alias],
+ ObjectInspectorCopyOption.WRITABLE);
+ StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[alias];
+ StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY.toString());
+ return (List<Object>) soi.getStructFieldData(row, sf);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "MERGEJOIN";
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.MERGEJOIN;
+ }
+
+ @Override
+ public void initializeLocalWork(Configuration hconf) throws HiveException {
+ Operator<? extends OperatorDesc> parent = null;
+
+ for (Operator<? extends OperatorDesc> parentOp : parentOperators) {
+ if (parentOp != null) {
+ parent = parentOp;
+ break;
+ }
+ }
+
+ if (parent == null) {
+ throw new HiveException("No valid parents.");
+ }
+ Map<Integer, DummyStoreOperator> dummyOps = parent.getTagToOperatorTree();
+ for (Entry<Integer, DummyStoreOperator> connectOp : dummyOps.entrySet()) {
+ parentOperators.add(connectOp.getKey(), connectOp.getValue());
+ connectOp.getValue().getChildOperators().add(this);
+ }
+ super.initializeLocalWork(hconf);
+ return;
+ }
+
+ public boolean isBigTableWork() {
+ return isBigTableWork;
+ }
+
+ public void setIsBigTableWork(boolean bigTableWork) {
+ this.isBigTableWork = bigTableWork;
+ }
+
+ public int getTagForOperator(Operator<? extends OperatorDesc> op) {
+ return originalParents.indexOf(op);
+ }
+
+ public void cloneOriginalParentsList(List<Operator<? extends OperatorDesc>> opList) {
+ originalParents.addAll(opList);
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java Wed Sep 24 07:03:35 2014
@@ -65,7 +65,7 @@ import org.apache.hadoop.hive.serde2.obj
*/
public class DummyStoreOperator extends Operator<DummyStoreDesc> implements Serializable {
- private transient InspectableObject result;
+ protected transient InspectableObject result;
public DummyStoreOperator() {
super();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Wed Sep 24 07:03:35 2014
@@ -165,7 +165,7 @@ public class FetchOperator implements Se
private void setupExecContext() {
if (hasVC || work.getSplitSample() != null) {
- context = new ExecMapperContext();
+ context = new ExecMapperContext(job);
if (operator != null) {
operator.setExecContext(context);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Wed Sep 24 07:03:35 2014
@@ -76,7 +76,7 @@ public class FilterOperator extends Oper
statsMap.put(Counter.FILTERED, filtered_count);
statsMap.put(Counter.PASSED, passed_count);
conditionInspector = null;
- ioContext = IOContext.get();
+ ioContext = IOContext.get(hconf.get(Utilities.INPUT_NAME));
} catch (Throwable e) {
throw new HiveException(e);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Wed Sep 24 07:03:35 2014
@@ -171,8 +171,9 @@ public class MapJoinOperator extends Abs
private void loadHashTable() throws HiveException {
- if (this.getExecContext().getLocalWork() == null
- || !this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
+ if ((this.getExecContext() != null)
+ && ((this.getExecContext().getLocalWork() == null) || (!this.getExecContext()
+ .getLocalWork().getInputFileChangeSensitive()))) {
if (hashTblInitedOnce) {
return;
} else {
@@ -313,8 +314,8 @@ public class MapJoinOperator extends Abs
tableContainer.dumpMetrics();
}
}
- if ((this.getExecContext().getLocalWork() != null
- && this.getExecContext().getLocalWork().getInputFileChangeSensitive())
+ if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null)
+ && (this.getExecContext().getLocalWork().getInputFileChangeSensitive())
&& mapJoinTables != null) {
for (MapJoinTableContainer tableContainer : mapJoinTables) {
if (tableContainer != null) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Wed Sep 24 07:03:35 2014
@@ -33,9 +33,10 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -181,7 +182,7 @@ public class MapOperator extends Operato
PartitionDesc pd = ctx.partDesc;
TableDesc td = pd.getTableDesc();
-
+
MapOpCtx opCtx = new MapOpCtx();
// Use table properties in case of unpartitioned tables,
// and the union of table properties and partition properties, with partition
@@ -205,42 +206,42 @@ public class MapOperator extends Operato
opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
partRawRowObjectInspector, opCtx.tblRawRowObjectInspector);
-
+
// Next check if this table has partitions and if so
// get the list of partition names as well as allocate
// the serdes for the partition columns
String pcols = overlayedProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
-
+
if (pcols != null && pcols.length() > 0) {
String[] partKeys = pcols.trim().split("/");
String pcolTypes = overlayedProps
.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
String[] partKeyTypes = pcolTypes.trim().split(":");
-
+
if (partKeys.length > partKeyTypes.length) {
throw new HiveException("Internal error : partKeys length, " +partKeys.length +
" greater than partKeyTypes length, " + partKeyTypes.length);
}
-
+
List<String> partNames = new ArrayList<String>(partKeys.length);
Object[] partValues = new Object[partKeys.length];
List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length);
-
+
for (int i = 0; i < partKeys.length; i++) {
String key = partKeys[i];
partNames.add(key);
ObjectInspector oi = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector
(TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]));
-
+
// Partitions do not exist for this table
if (partSpec == null) {
// for partitionless table, initialize partValue to null
partValues[i] = null;
} else {
- partValues[i] =
+ partValues[i] =
ObjectInspectorConverters.
getConverter(PrimitiveObjectInspectorFactory.
- javaStringObjectInspector, oi).convert(partSpec.get(key));
+ javaStringObjectInspector, oi).convert(partSpec.get(key));
}
partObjectInspectors.add(oi);
}
@@ -337,13 +338,8 @@ public class MapOperator extends Operato
return tableDescOI;
}
- private boolean isPartitioned(PartitionDesc pd) {
- return pd.getPartSpec() != null && !pd.getPartSpec().isEmpty();
- }
-
public void setChildren(Configuration hconf) throws HiveException {
-
- Path fpath = IOContext.get().getInputPath();
+ Path fpath = IOContext.get(hconf.get(Utilities.INPUT_NAME)).getInputPath();
boolean schemeless = fpath.toUri().getScheme() == null;
@@ -639,4 +635,8 @@ public class MapOperator extends Operato
return null;
}
+ @Override
+ public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
+ return MapRecordProcessor.getConnectOps();
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Wed Sep 24 07:03:35 2014
@@ -146,6 +146,7 @@ public abstract class Operator<T extends
/**
* Implements the getChildren function for the Node Interface.
*/
+ @Override
public ArrayList<Node> getChildren() {
if (getChildOperators() == null) {
@@ -851,6 +852,7 @@ public abstract class Operator<T extends
*
* @return the name of the operator
*/
+ @Override
public String getName() {
return getOperatorName();
}
@@ -1061,7 +1063,7 @@ public abstract class Operator<T extends
if (parents != null) {
for (Operator<? extends OperatorDesc> parent : parents) {
- parentClones.add((Operator<? extends OperatorDesc>)(parent.clone()));
+ parentClones.add((parent.clone()));
}
}
@@ -1082,8 +1084,8 @@ public abstract class Operator<T extends
public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException {
T descClone = (T) conf.clone();
Operator<? extends OperatorDesc> ret =
- (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild(
- descClone, getSchema());
+ OperatorFactory.getAndMakeChild(
+ descClone, getSchema());
return ret;
}
@@ -1254,15 +1256,15 @@ public abstract class Operator<T extends
}
return null;
}
-
+
public OpTraits getOpTraits() {
if (conf != null) {
return conf.getOpTraits();
}
-
+
return null;
}
-
+
public void setOpTraits(OpTraits metaInfo) {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting traits ("+metaInfo+") on "+this);
@@ -1299,7 +1301,17 @@ public abstract class Operator<T extends
private static class DummyOperator extends Operator {
public DummyOperator() { super("dummy"); }
+ @Override
public void processOp(Object row, int tag) { }
+ @Override
public OperatorType getType() { return null; }
}
+
+ public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
+ if ((parentOperators == null) || (parentOperators.size() == 0)) {
+ return null;
+ }
+ Map<Integer, DummyStoreOperator> dummyOps = parentOperators.get(0).getTagToOperatorTree();
+ return dummyOps;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Wed Sep 24 07:03:35 2014
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
+import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
import org.apache.hadoop.hive.ql.plan.DemuxDesc;
import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
@@ -115,6 +116,8 @@ public final class OperatorFactory {
RCFileMergeOperator.class));
opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class,
OrcFileMergeOperator.class));
+ opvec.add(new OpTuple<CommonMergeJoinDesc>(CommonMergeJoinDesc.class,
+ CommonMergeJoinOperator.class));
}
static {
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+/**
+ * A dummy store operator same as the dummy store operator but for tez. This is required so that we
+ * don't check for tez everytime before forwarding a record. In tez records flow down from the dummy
+ * store operator in processOp phase unlike in map reduce.
+ *
+ */
+public class TezDummyStoreOperator extends DummyStoreOperator {
+
+ /**
+ * Unlike the MR counterpoint, on Tez we want processOp to forward
+ * the records.
+ */
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ super.processOp(row, tag);
+ forward(result.o, outputObjInspector);
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Sep 24 07:03:35 2014
@@ -88,6 +88,7 @@ import org.apache.hadoop.hive.ql.plan.Fi
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -200,6 +201,8 @@ public final class Utilities {
public static String HADOOP_LOCAL_FS = "file:///";
public static String MAP_PLAN_NAME = "map.xml";
public static String REDUCE_PLAN_NAME = "reduce.xml";
+ public static String MERGE_PLAN_NAME = "merge.xml";
+ public static final String INPUT_NAME = "iocontext.input.name";
public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class";
public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class";
@@ -290,6 +293,39 @@ public final class Utilities {
return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME);
}
+ public static Path setMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, Path mrScratchDir,
+ boolean useCache) {
+ for (BaseWork baseWork : mergeJoinWork.getBaseWorkList()) {
+ setBaseWork(conf, baseWork, mrScratchDir, baseWork.getName() + MERGE_PLAN_NAME, useCache);
+ String prefixes = conf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
+ if (prefixes == null) {
+ prefixes = baseWork.getName();
+ } else {
+ prefixes = prefixes + "," + baseWork.getName();
+ }
+ conf.set(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES, prefixes);
+ }
+
+ // nothing to return
+ return null;
+ }
+
+ public static BaseWork getMergeWork(JobConf jconf) {
+ if ((jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX) == null)
+ || (jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX).isEmpty())) {
+ return null;
+ }
+ return getMergeWork(jconf, jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX));
+ }
+
+ public static BaseWork getMergeWork(JobConf jconf, String prefix) {
+ if (prefix == null || prefix.isEmpty()) {
+ return null;
+ }
+
+ return getBaseWork(jconf, prefix + MERGE_PLAN_NAME);
+ }
+
public static void cacheBaseWork(Configuration conf, String name, BaseWork work,
Path hiveScratchDir) {
try {
@@ -368,6 +404,8 @@ public final class Utilities {
throw new RuntimeException("unable to determine work from configuration ."
+ MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ;
}
+ } else if (name.contains(MERGE_PLAN_NAME)) {
+ gWork = deserializePlan(in, MapWork.class, conf);
}
gWorkMap.put(path, gWork);
} else {
@@ -600,8 +638,14 @@ public final class Utilities {
}
public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) {
+ String useName = conf.get(INPUT_NAME);
+ if (useName == null) {
+ useName = "mapreduce";
+ }
+ conf.set(INPUT_NAME, useName);
setMapWork(conf, w.getMapWork(), hiveScratchDir, true);
if (w.getReduceWork() != null) {
+ conf.set(INPUT_NAME, useName);
setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true);
}
}
@@ -1838,7 +1882,7 @@ public final class Utilities {
for (int i = 0; i < parts.length; ++i) {
assert parts[i].isDir() : "dynamic partition " + parts[i].getPath()
- + " is not a direcgtory";
+ + " is not a directory";
FileStatus[] items = fs.listStatus(parts[i].getPath());
// remove empty directory since DP insert should not generate empty partitions.
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Wed Sep 24 07:03:35 2014
@@ -78,10 +78,11 @@ public class ExecMapper extends MapReduc
private MapredLocalWork localWork = null;
private boolean isLogInfoEnabled = false;
- private final ExecMapperContext execContext = new ExecMapperContext();
+ private ExecMapperContext execContext = null;
@Override
public void configure(JobConf job) {
+ execContext = new ExecMapperContext(job);
// Allocate the bean at the beginning -
memoryMXBean = ManagementFactory.getMemoryMXBean();
l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
@@ -292,6 +293,7 @@ public class ExecMapper extends MapReduc
this.rp = rp;
}
+ @Override
public void func(Operator op) {
Map<Enum<?>, Long> opStats = op.getStats();
for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java Wed Sep 24 07:03:35 2014
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FetchOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.mapred.JobConf;
@@ -60,8 +61,9 @@ public class ExecMapperContext {
this.currentBigBucketFile = currentBigBucketFile;
}
- public ExecMapperContext() {
- ioCxt = IOContext.get();
+ public ExecMapperContext(JobConf jc) {
+ this.jc = jc;
+ ioCxt = IOContext.get(jc.get(Utilities.INPUT_NAME));
}
public void clear() {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Wed Sep 24 07:03:35 2014
@@ -91,7 +91,7 @@ public class MapredLocalTask extends Tas
// not sure we need this exec context; but all the operators in the work
// will pass this context throught
- private ExecMapperContext execContext = new ExecMapperContext();
+ private ExecMapperContext execContext = null;
private Process executor;
@@ -113,6 +113,7 @@ public class MapredLocalTask extends Tas
public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
super.initialize(conf, queryPlan, driverContext);
job = new JobConf(conf, ExecDriver.class);
+ execContext = new ExecMapperContext(job);
//we don't use the HadoopJobExecHooks for local tasks
this.jobExecHelper = new HadoopJobExecHelper(job, console, this, null);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Wed Sep 24 07:03:35 2014
@@ -31,7 +31,10 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapred.FileSplit;
@@ -79,9 +82,14 @@ public class CustomPartitionVertex exten
private List<InputDataInformationEvent> dataInformationEvents;
private int numBuckets = -1;
private Configuration conf = null;
- private boolean rootVertexInitialized = false;
private final SplitGrouper grouper = new SplitGrouper();
private int taskCount = 0;
+ private VertexType vertexType;
+ private String mainWorkName;
+ private final Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create();
+
+ private final Map<String, Multimap<Integer, InputSplit>> inputToGroupedSplitMap =
+ new HashMap<String, Multimap<Integer, InputSplit>>();
public CustomPartitionVertex(VertexManagerPluginContext context) {
super(context);
@@ -90,8 +98,18 @@ public class CustomPartitionVertex exten
@Override
public void initialize() {
this.context = getContext();
- ByteBuffer byteBuf = context.getUserPayload().getPayload();
- this.numBuckets = byteBuf.getInt();
+ ByteBuffer payload = context.getUserPayload().getPayload();
+ CustomVertexConfiguration vertexConf = new CustomVertexConfiguration();
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(payload);
+ try {
+ vertexConf.readFields(dibb);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ this.numBuckets = vertexConf.getNumBuckets();
+ this.mainWorkName = vertexConf.getInputName();
+ this.vertexType = vertexConf.getVertexType();
}
@Override
@@ -113,17 +131,12 @@ public class CustomPartitionVertex exten
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
}
- // One call per root Input - and for now only one is handled.
+ // One call per root Input
@Override
public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
List<Event> events) {
+ LOG.info("On root vertex initialized " + inputName);
- // Ideally, since there's only 1 Input expected at the moment -
- // ensure this method is called only once. Tez will call it once per Root
- // Input.
- Preconditions.checkState(rootVertexInitialized == false);
- LOG.info("Root vertex not initialized");
- rootVertexInitialized = true;
try {
// This is using the payload from the RootVertexInitializer corresponding
// to InputName. Ideally it should be using it's own configuration class -
@@ -164,9 +177,6 @@ public class CustomPartitionVertex exten
// No tasks should have been started yet. Checked by initial state
// check.
Preconditions.checkState(dataInformationEventSeen == false);
- Preconditions
- .checkState(context.getVertexNumTasks(context.getVertexName()) == -1,
- "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism");
InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event;
// The vertex cannot be configured until all DataEvents are seen - to
@@ -220,21 +230,55 @@ public class CustomPartitionVertex exten
(bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
Multimap<Integer, InputSplit> groupedSplit =
HiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
- availableSlots);
+ availableSlots, inputName);
bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
}
- LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap.size() + " tasks");
- processAllEvents(inputName, bucketToGroupedSplitMap);
+ LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap);
+ if ((mainWorkName.isEmpty() == false) && (mainWorkName.compareTo(inputName) != 0)) {
+ /*
+ * this is the small table side. In case of SMB join, we may need to send each split to the
+ * corresponding bucket-based task on the other side. In case a split needs to go to
+ * multiple downstream tasks, we need to clone the event and send it to the right
+ * destination.
+ */
+ processAllSideEvents(inputName, bucketToGroupedSplitMap);
+ } else {
+ processAllEvents(inputName, bucketToGroupedSplitMap);
+ }
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+ private void processAllSideEvents(String inputName,
+ Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
+ // the bucket to task map should have been setup by the big table.
+ if (bucketToTaskMap.isEmpty()) {
+ inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap);
+ return;
+ }
+ List<InputDataInformationEvent> taskEvents = new ArrayList<InputDataInformationEvent>();
+ for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
+ Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey());
+ for (Integer task : destTasks) {
+ for (InputSplit split : entry.getValue()) {
+ MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split);
+ InputDataInformationEvent diEvent =
+ InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit
+ .toByteString().asReadOnlyByteBuffer());
+ diEvent.setTargetIndex(task);
+ taskEvents.add(diEvent);
+ }
+ }
+ }
+
+ context.addRootInputEvents(inputName, taskEvents);
+ }
+
private void processAllEvents(String inputName,
Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
- Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create();
List<InputSplit> finalSplits = Lists.newLinkedList();
for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
int bucketNum = entry.getKey();
@@ -248,11 +292,13 @@ public class CustomPartitionVertex exten
// Construct the EdgeManager descriptor to be used by all edges which need
// the routing table.
- EdgeManagerPluginDescriptor hiveEdgeManagerDesc =
- EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
- UserPayload payload = getBytePayload(bucketToTaskMap);
- hiveEdgeManagerDesc.setUserPayload(payload);
-
+ EdgeManagerPluginDescriptor hiveEdgeManagerDesc = null;
+ if ((vertexType == VertexType.MULTI_INPUT_INITIALIZED_EDGES)
+ || (vertexType == VertexType.INITIALIZED_EDGES)) {
+ hiveEdgeManagerDesc = EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
+ UserPayload payload = getBytePayload(bucketToTaskMap);
+ hiveEdgeManagerDesc.setUserPayload(payload);
+ }
Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
// Replace the edge manager for all vertices which have routing type custom.
@@ -285,13 +331,21 @@ public class CustomPartitionVertex exten
rootInputSpecUpdate.put(
inputName,
InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
- context.setVertexParallelism(
- taskCount,
- VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
- .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
+ if ((mainWorkName.compareTo(inputName) == 0) || (mainWorkName.isEmpty())) {
+ context.setVertexParallelism(
+ taskCount,
+ VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
+ .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
+ }
// Set the actual events for the tasks.
context.addRootInputEvents(inputName, taskEvents);
+ if (inputToGroupedSplitMap.isEmpty() == false) {
+ for (Entry<String, Multimap<Integer, InputSplit>> entry : inputToGroupedSplitMap.entrySet()) {
+ processAllSideEvents(entry.getKey(), entry.getValue());
+ }
+ inputToGroupedSplitMap.clear();
+ }
}
UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
@@ -315,7 +369,8 @@ public class CustomPartitionVertex exten
if (!(inputSplit instanceof FileSplit)) {
throw new UnsupportedOperationException(
- "Cannot handle splits other than FileSplit for the moment");
+ "Cannot handle splits other than FileSplit for the moment. Current input split type: "
+ + inputSplit.getClass().getSimpleName());
}
return (FileSplit) inputSplit;
}
@@ -327,7 +382,6 @@ public class CustomPartitionVertex exten
Map<String, List<FileSplit>> pathFileSplitsMap) {
int bucketNum = 0;
- int fsCount = 0;
Multimap<Integer, InputSplit> bucketToInitialSplitMap =
ArrayListMultimap.<Integer, InputSplit> create();
@@ -335,14 +389,20 @@ public class CustomPartitionVertex exten
for (Map.Entry<String, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
int bucketId = bucketNum % numBuckets;
for (FileSplit fsplit : entry.getValue()) {
- fsCount++;
bucketToInitialSplitMap.put(bucketId, fsplit);
}
bucketNum++;
}
- LOG.info("Total number of splits counted: " + fsCount + " and total files encountered: "
- + pathFileSplitsMap.size());
+ if (bucketNum < numBuckets) {
+ int loopedBucketId = 0;
+ for (; bucketNum < numBuckets; bucketNum++) {
+ for (InputSplit fsplit : bucketToInitialSplitMap.get(loopedBucketId)) {
+ bucketToInitialSplitMap.put(bucketNum, fsplit);
+ }
+ loopedBucketId++;
+ }
+ }
return bucketToInitialSplitMap;
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
+import org.apache.hadoop.io.Writable;
+
+/*
+ * This class is the payload for custom vertex. It serializes and de-serializes
+ * @numBuckets: the number of buckets of the "big table"
+ * @vertexType: this is the type of vertex and differentiates between bucket map join and SMB joins
+ * @inputName: This is the name of the input. Used in case of SMB joins
+ */
+public class CustomVertexConfiguration implements Writable {
+
+ private int numBuckets;
+ private VertexType vertexType = VertexType.AUTO_INITIALIZED_EDGES;
+ private String inputName;
+
+ public CustomVertexConfiguration() {
+ }
+
+ public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName) {
+ this.numBuckets = numBuckets;
+ this.vertexType = vertexType;
+ this.inputName = inputName;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(this.vertexType.ordinal());
+ out.writeInt(this.numBuckets);
+ out.writeUTF(inputName);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.vertexType = VertexType.values()[in.readInt()];
+ this.numBuckets = in.readInt();
+ this.inputName = in.readUTF();
+ }
+
+ public int getNumBuckets() {
+ return numBuckets;
+ }
+
+ public VertexType getVertexType() {
+ return vertexType;
+ }
+
+ public String getInputName() {
+ return inputName;
+ }
+}