You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/06 00:26:58 UTC
svn commit: r1629544 [9/33] - in /hive/branches/spark-new: ./
accumulo-handler/ beeline/ beeline/src/java/org/apache/hive/beeline/ bin/
bin/ext/ common/ common/src/java/org/apache/hadoop/hive/conf/
common/src/test/org/apache/hadoop/hive/common/type/ co...
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Sun Oct 5 22:26:43 2014
@@ -109,8 +109,8 @@ import org.apache.hadoop.hive.serde2.laz
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
import org.apache.thrift.TException;
import com.google.common.collect.Sets;
@@ -378,6 +378,27 @@ public class Hive {
List<String> partCols, Class<? extends InputFormat> fileInputFormat,
Class<?> fileOutputFormat, int bucketCount, List<String> bucketCols)
throws HiveException {
+ createTable(tableName, columns, partCols, fileInputFormat, fileOutputFormat, bucketCount,
+ bucketCols, null);
+ }
+
+ /**
+ * Create a table metadata and the directory for the table data
+ * @param tableName table name
+ * @param columns list of fields of the table
+ * @param partCols partition keys of the table
+ * @param fileInputFormat Class of the input format of the table data file
+ * @param fileOutputFormat Class of the output format of the table data file
+ * @param bucketCount number of buckets that each partition (or the table itself) should be
+ * divided into
+ * @param bucketCols Bucket columns
+ * @param parameters Parameters for the table
+ * @throws HiveException
+ */
+ public void createTable(String tableName, List<String> columns, List<String> partCols,
+ Class<? extends InputFormat> fileInputFormat,
+ Class<?> fileOutputFormat, int bucketCount, List<String> bucketCols,
+ Map<String, String> parameters) throws HiveException {
if (columns == null) {
throw new HiveException("columns not specified for table " + tableName);
}
@@ -402,6 +423,9 @@ public class Hive {
tbl.setSerializationLib(LazySimpleSerDe.class.getName());
tbl.setNumBuckets(bucketCount);
tbl.setBucketCols(bucketCols);
+ if (parameters != null) {
+ tbl.setParamters(parameters);
+ }
createTable(tbl);
}
@@ -427,9 +451,9 @@ public class Hive {
newTbl.checkValidity();
getMSC().alter_table(names[0], names[1], newTbl.getTTable());
} catch (MetaException e) {
- throw new HiveException("Unable to alter table.", e);
+ throw new HiveException("Unable to alter table. " + e.getMessage(), e);
} catch (TException e) {
- throw new HiveException("Unable to alter table.", e);
+ throw new HiveException("Unable to alter table. " + e.getMessage(), e);
}
}
@@ -455,9 +479,9 @@ public class Hive {
try {
getMSC().alter_index(dbName, baseTblName, idxName, newIdx);
} catch (MetaException e) {
- throw new HiveException("Unable to alter index.", e);
+ throw new HiveException("Unable to alter index. " + e.getMessage(), e);
} catch (TException e) {
- throw new HiveException("Unable to alter index.", e);
+ throw new HiveException("Unable to alter index. " + e.getMessage(), e);
}
}
@@ -502,9 +526,9 @@ public class Hive {
getMSC().alter_partition(dbName, tblName, newPart.getTPartition());
} catch (MetaException e) {
- throw new HiveException("Unable to alter partition.", e);
+ throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
} catch (TException e) {
- throw new HiveException("Unable to alter partition.", e);
+ throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
}
}
@@ -534,9 +558,9 @@ public class Hive {
}
getMSC().alter_partitions(names[0], names[1], newTParts);
} catch (MetaException e) {
- throw new HiveException("Unable to alter partition.", e);
+ throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
} catch (TException e) {
- throw new HiveException("Unable to alter partition.", e);
+ throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
}
}
/**
@@ -578,11 +602,11 @@ public class Hive {
newPart.getTPartition());
} catch (InvalidOperationException e){
- throw new HiveException("Unable to rename partition.", e);
+ throw new HiveException("Unable to rename partition. " + e.getMessage(), e);
} catch (MetaException e) {
- throw new HiveException("Unable to rename partition.", e);
+ throw new HiveException("Unable to rename partition. " + e.getMessage(), e);
} catch (TException e) {
- throw new HiveException("Unable to rename partition.", e);
+ throw new HiveException("Unable to rename partition. " + e.getMessage(), e);
}
}
@@ -591,11 +615,11 @@ public class Hive {
try {
getMSC().alterDatabase(dbName, db);
} catch (MetaException e) {
- throw new HiveException("Unable to alter database " + dbName, e);
+ throw new HiveException("Unable to alter database " + dbName + ". " + e.getMessage(), e);
} catch (NoSuchObjectException e) {
throw new HiveException("Database " + dbName + " does not exists.", e);
} catch (TException e) {
- throw new HiveException("Unable to alter database " + dbName, e);
+ throw new HiveException("Unable to alter database " + dbName + ". " + e.getMessage(), e);
}
}
/**
@@ -870,14 +894,31 @@ public class Hive {
try {
return getMSC().dropIndex(db_name, tbl_name, index_name, deleteData);
} catch (NoSuchObjectException e) {
- throw new HiveException("Partition or table doesn't exist.", e);
+ throw new HiveException("Partition or table doesn't exist. " + e.getMessage(), e);
} catch (Exception e) {
- throw new HiveException("Unknown error. Please check logs.", e);
+ throw new HiveException(e.getMessage(), e);
}
}
/**
* Drops table along with the data in it. If the table doesn't exist then it
+ * is a no-op. If ifPurge option is specified it is passed to the
+ * hdfs command that removes table data from warehouse to make it skip trash.
+ *
+ * @param tableName
+ * table to drop
+ * @param ifPurge
+ * completely purge the table (skipping trash) while removing data from warehouse
+ * @throws HiveException
+ * thrown if the drop fails
+ */
+ public void dropTable(String tableName, boolean ifPurge) throws HiveException {
+ String[] names = Utilities.getDbTableName(tableName);
+ dropTable(names[0], names[1], true, true, ifPurge);
+ }
+
+ /**
+ * Drops table along with the data in it. If the table doesn't exist then it
* is a no-op
*
* @param tableName
@@ -886,8 +927,7 @@ public class Hive {
* thrown if the drop fails
*/
public void dropTable(String tableName) throws HiveException {
- String[] names = Utilities.getDbTableName(tableName);
- dropTable(names[0], names[1], true, true);
+ dropTable(tableName, false);
}
/**
@@ -902,7 +942,7 @@ public class Hive {
* thrown if the drop fails
*/
public void dropTable(String dbName, String tableName) throws HiveException {
- dropTable(dbName, tableName, true, true);
+ dropTable(dbName, tableName, true, true, false);
}
/**
@@ -913,14 +953,31 @@ public class Hive {
* @param deleteData
* deletes the underlying data along with metadata
* @param ignoreUnknownTab
- * an exception if thrown if this is falser and table doesn't exist
+ * an exception is thrown if this is false and the table doesn't exist
* @throws HiveException
*/
public void dropTable(String dbName, String tableName, boolean deleteData,
boolean ignoreUnknownTab) throws HiveException {
+ dropTable(dbName, tableName, deleteData, ignoreUnknownTab, false);
+ }
+ /**
+ * Drops the table.
+ *
+ * @param dbName
+ * @param tableName
+ * @param deleteData
+ * deletes the underlying data along with metadata
+ * @param ignoreUnknownTab
+ * an exception is thrown if this is false and the table doesn't exist
+ * @param ifPurge
+ * completely purge the table skipping trash while removing data from warehouse
+ * @throws HiveException
+ */
+ public void dropTable(String dbName, String tableName, boolean deleteData,
+ boolean ignoreUnknownTab, boolean ifPurge) throws HiveException {
try {
- getMSC().dropTable(dbName, tableName, deleteData, ignoreUnknownTab);
+ getMSC().dropTable(dbName, tableName, deleteData, ignoreUnknownTab, ifPurge);
} catch (NoSuchObjectException e) {
if (!ignoreUnknownTab) {
throw new HiveException(e);
@@ -1008,7 +1065,7 @@ public class Hive {
}
return null;
} catch (Exception e) {
- throw new HiveException("Unable to fetch table " + tableName, e);
+ throw new HiveException("Unable to fetch table " + tableName + ". " + e.getMessage(), e);
}
// For non-views, we need to do some extra fixes
@@ -1204,6 +1261,15 @@ public class Hive {
return getDatabase(currentDb);
}
+ public void loadPartition(Path loadPath, String tableName,
+ Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
+ boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
+ boolean isSrcLocal, boolean isAcid) throws HiveException {
+ Table tbl = getTable(tableName);
+ loadPartition(loadPath, tbl, partSpec, replace, holdDDLTime, inheritTableSpecs,
+ isSkewedStoreAsSubdir, isSrcLocal, isAcid);
+ }
+
/**
* Load a directory into a Hive Table Partition - Alters existing content of
* the partition with the contents of loadPath. - If the partition does not
@@ -1212,7 +1278,7 @@ public class Hive {
*
* @param loadPath
* Directory containing files to load into Table
- * @param tableName
+ * @param tbl
* name of table to be loaded.
* @param partSpec
* defines which partition needs to be loaded
@@ -1225,12 +1291,12 @@ public class Hive {
* @param isSrcLocal
* If the source directory is LOCAL
*/
- public void loadPartition(Path loadPath, String tableName,
+ public Partition loadPartition(Path loadPath, Table tbl,
Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
boolean isSrcLocal, boolean isAcid) throws HiveException {
- Table tbl = getTable(tableName);
Path tblDataLocationPath = tbl.getDataLocation();
+ Partition newTPart = null;
try {
/**
* Move files before creating the partition since down stream processes
@@ -1279,10 +1345,10 @@ public class Hive {
Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid);
}
+ boolean forceCreate = (!holdDDLTime) ? true : false;
+ newTPart = getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), inheritTableSpecs);
// recreate the partition if it existed before
if (!holdDDLTime) {
- Partition newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(),
- inheritTableSpecs);
if (isSkewedStoreAsSubdir) {
org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition();
SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo();
@@ -1292,9 +1358,9 @@ public class Hive {
/* Add list bucketing location mappings. */
skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
- alterPartition(tbl.getTableName(), new Partition(tbl, newCreatedTpart));
+ alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart));
newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs);
- newCreatedTpart = newTPart.getTPartition();
+ return new Partition(tbl, newCreatedTpart);
}
}
} catch (IOException e) {
@@ -1307,7 +1373,7 @@ public class Hive {
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
}
-
+ return newTPart;
}
/**
@@ -1403,18 +1469,18 @@ private void constructOneLBLocationMap(F
* @param replace
* @param numDP number of dynamic partitions
* @param holdDDLTime
- * @return a list of strings with the dynamic partition paths
+ * @return partition map details (PartitionSpec and Partition)
* @throws HiveException
*/
- public ArrayList<LinkedHashMap<String, String>> loadDynamicPartitions(Path loadPath,
+ public Map<Map<String, String>, Partition> loadDynamicPartitions(Path loadPath,
String tableName, Map<String, String> partSpec, boolean replace,
int numDP, boolean holdDDLTime, boolean listBucketingEnabled, boolean isAcid)
throws HiveException {
Set<Path> validPartitions = new HashSet<Path>();
try {
- ArrayList<LinkedHashMap<String, String>> fullPartSpecs =
- new ArrayList<LinkedHashMap<String, String>>();
+ Map<Map<String, String>, Partition> partitionsMap = new
+ LinkedHashMap<Map<String, String>, Partition>();
FileSystem fs = loadPath.getFileSystem(conf);
FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP+1, fs);
@@ -1448,6 +1514,7 @@ private void constructOneLBLocationMap(F
+ " to at least " + validPartitions.size() + '.');
}
+ Table tbl = getTable(tableName);
// for each dynamically created DP directory, construct a full partition spec
// and load the partition based on that
Iterator<Path> iter = validPartitions.iterator();
@@ -1460,14 +1527,12 @@ private void constructOneLBLocationMap(F
// generate a full partition specification
LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>(partSpec);
Warehouse.makeSpecFromName(fullPartSpec, partPath);
- fullPartSpecs.add(fullPartSpec);
-
- // finally load the partition -- move the file to the final table address
- loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true,
- listBucketingEnabled, false, isAcid);
+ Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace,
+ holdDDLTime, true, listBucketingEnabled, false, isAcid);
+ partitionsMap.put(fullPartSpec, newPartition);
LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
}
- return fullPartSpecs;
+ return partitionsMap;
} catch (IOException e) {
throw new HiveException(e);
}
@@ -1500,6 +1565,7 @@ private void constructOneLBLocationMap(F
tbl.replaceFiles(loadPath, isSrcLocal);
} else {
tbl.copyFiles(loadPath, isSrcLocal, isAcid);
+ tbl.getParameters().put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, "true");
}
try {
@@ -1613,17 +1679,6 @@ private void constructOneLBLocationMap(F
return getPartition(tbl, partSpec, forceCreate, null, true);
}
- private static void clearPartitionStats(org.apache.hadoop.hive.metastore.api.Partition tpart) {
- Map<String,String> tpartParams = tpart.getParameters();
- if (tpartParams == null) {
- return;
- }
-
- for (String statType : StatsSetupConst.supportedStats) {
- tpartParams.remove(statType);
- }
- }
-
/**
* Returns partition metadata
*
@@ -1691,7 +1746,7 @@ private void constructOneLBLocationMap(F
throw new HiveException("new partition path should not be null or empty.");
}
tpart.getSd().setLocation(partPath);
- clearPartitionStats(tpart);
+ tpart.getParameters().put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK,"true");
String fullName = tbl.getTableName();
if (!org.apache.commons.lang.StringUtils.isEmpty(tbl.getDbName())) {
fullName = tbl.getDbName() + "." + tbl.getTableName();
@@ -1722,7 +1777,7 @@ private void constructOneLBLocationMap(F
} catch (NoSuchObjectException e) {
throw new HiveException("Partition or table doesn't exist.", e);
} catch (Exception e) {
- throw new HiveException("Unknown error. Please check logs.", e);
+ throw new HiveException(e.getMessage(), e);
}
}
@@ -1736,6 +1791,7 @@ private void constructOneLBLocationMap(F
public List<Partition> dropPartitions(String dbName, String tblName,
List<DropTableDesc.PartSpec> partSpecs, boolean deleteData, boolean ignoreProtection,
boolean ifExists) throws HiveException {
+ //TODO: add support for ifPurge
try {
Table tbl = getTable(dbName, tblName);
List<ObjectPair<Integer, byte[]>> partExprs =
@@ -1750,7 +1806,7 @@ private void constructOneLBLocationMap(F
} catch (NoSuchObjectException e) {
throw new HiveException("Partition or table doesn't exist.", e);
} catch (Exception e) {
- throw new HiveException("Unknown error. Please check logs.", e);
+ throw new HiveException(e.getMessage(), e);
}
}
@@ -2243,7 +2299,7 @@ private void constructOneLBLocationMap(F
result.add(srcToDest);
}
} catch (IOException e) {
- throw new HiveException("checkPaths: filesystem error in check phase", e);
+ throw new HiveException("checkPaths: filesystem error in check phase. " + e.getMessage(), e);
}
return result;
}
@@ -2310,7 +2366,7 @@ private void constructOneLBLocationMap(F
try {
ShimLoader.getHadoopShims().setFullFileStatus(conf, destStatus, fs, destf);
} catch (IOException e) {
- LOG.warn("Error setting permission of file " + destf + ": "+ StringUtils.stringifyException(e));
+ LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e);
}
}
return success;
@@ -2349,7 +2405,7 @@ private void constructOneLBLocationMap(F
srcs = srcFs.globStatus(srcf);
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
- throw new HiveException("addFiles: filesystem error in check phase", e);
+ throw new HiveException("addFiles: filesystem error in check phase. " + e.getMessage(), e);
}
if (srcs == null) {
LOG.info("No sources specified to move: " + srcf);
@@ -2375,7 +2431,7 @@ private void constructOneLBLocationMap(F
}
}
} catch (IOException e) {
- throw new HiveException("copyFiles: error while moving files!!!", e);
+ throw new HiveException("copyFiles: error while moving files!!! " + e.getMessage(), e);
}
}
}
@@ -2447,7 +2503,7 @@ private void constructOneLBLocationMap(F
fs.rename(bucketSrc, bucketDest);
}
} catch (IOException e) {
- throw new HiveException("Error moving acid files", e);
+ throw new HiveException("Error moving acid files " + e.getMessage(), e);
}
}
}
@@ -2679,7 +2735,7 @@ private void constructOneLBLocationMap(F
throw new HiveException(e);
}
}
-
+
public boolean setPartitionColumnStatistics(SetPartitionsStatsRequest request) throws HiveException {
try {
return getMSC().setPartitionColumnStatistics(request);
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Sun Oct 5 22:26:43 2014
@@ -451,7 +451,11 @@ public class SessionHiveMetaStoreClient
// Delete table data
if (deleteData && !MetaStoreUtils.isExternalTable(table)) {
try {
- getWh().deleteDir(tablePath, true);
+ boolean ifPurge = false;
+ if (envContext != null){
+ ifPurge = Boolean.parseBoolean(envContext.getProperties().get("ifPurge"));
+ }
+ getWh().deleteDir(tablePath, true, ifPurge);
} catch (Exception err) {
LOG.error("Failed to delete temp table directory: " + tablePath, err);
// Forgive error
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Sun Oct 5 22:26:43 2014
@@ -385,6 +385,10 @@ public class Table implements Serializab
tTable.getParameters().put(name, value);
}
+ public void setParamters(Map<String, String> params) {
+ tTable.setParameters(params);
+ }
+
public String getProperty(String name) {
return tTable.getParameters().get(name);
}
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Sun Oct 5 22:26:43 2014
@@ -670,10 +670,15 @@ public final class ConstantPropagateProc
cppCtx.getOpToConstantExprs().put(op, constants);
foldOperator(op, cppCtx);
List<ExprNodeDesc> colList = op.getConf().getColList();
+ List<String> columnNames = op.getConf().getOutputColumnNames();
+ Map<String, ExprNodeDesc> columnExprMap = op.getColumnExprMap();
if (colList != null) {
for (int i = 0; i < colList.size(); i++) {
ExprNodeDesc newCol = foldExpr(colList.get(i), constants, cppCtx, op, 0, false);
colList.set(i, newCol);
+ if (columnExprMap != null) {
+ columnExprMap.put(columnNames.get(i), newCol);
+ }
}
LOG.debug("New column list:(" + StringUtils.join(colList, " ") + ")");
}
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Sun Oct 5 22:26:43 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.optimizer;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -29,12 +30,17 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MuxOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -42,12 +48,16 @@ import org.apache.hadoop.hive.ql.parse.O
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.OpTraits;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.Statistics;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* ConvertJoinMapJoin is an optimization that replaces a common join
@@ -60,39 +70,46 @@ public class ConvertJoinMapJoin implemen
static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName());
+ @SuppressWarnings("unchecked")
@Override
- /*
- * (non-Javadoc)
- * we should ideally not modify the tree we traverse.
- * However, since we need to walk the tree at any time when we modify the
- * operator, we might as well do it here.
- */
- public Object process(Node nd, Stack<Node> stack,
- NodeProcessorCtx procCtx, Object... nodeOutputs)
- throws SemanticException {
+ /*
+ * (non-Javadoc) we should ideally not modify the tree we traverse. However,
+ * since we need to walk the tree at any time when we modify the operator, we
+ * might as well do it here.
+ */
+ public Object
+ process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
+ throws SemanticException {
OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
- if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
+ JoinOperator joinOp = (JoinOperator) nd;
+
+ if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)
+ && !(context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) {
+ // we are just converting to a common merge join operator. The shuffle
+ // join in map-reduce case.
+ int pos = 0; // it doesn't matter which position we use in this case.
+ convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
return null;
}
- JoinOperator joinOp = (JoinOperator) nd;
- // if we have traits, and table info is present in the traits, we know the
+ // if we have traits, and table info is present in the traits, we know the
// exact number of buckets. Else choose the largest number of estimated
// reducers from the parent operators.
int numBuckets = -1;
int estimatedBuckets = -1;
+ TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) {
if (parentOp.getOpTraits().getNumBuckets() > 0) {
- numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ?
- parentOp.getOpTraits().getNumBuckets() : numBuckets;
+ numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ?
+ parentOp.getOpTraits().getNumBuckets() : numBuckets;
}
if (parentOp instanceof ReduceSinkOperator) {
ReduceSinkOperator rs = (ReduceSinkOperator)parentOp;
- estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ?
+ estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ?
rs.getConf().getNumReducers() : estimatedBuckets;
}
}
@@ -107,29 +124,80 @@ public class ConvertJoinMapJoin implemen
numBuckets = 1;
}
LOG.info("Estimated number of buckets " + numBuckets);
- int mapJoinConversionPos = mapJoinConversionPos(joinOp, context, numBuckets);
+ int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets);
if (mapJoinConversionPos < 0) {
- // we cannot convert to bucket map join, we cannot convert to
- // map join either based on the size
+ // we cannot convert to bucket map join, we cannot convert to
+ // map join either based on the size. Check if we can convert to SMB join.
+ if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
+ convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
+ return null;
+ }
+ Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
+ try {
+ bigTableMatcherClass =
+ (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar(
+ context.parseContext.getConf(),
+ HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR)));
+ } catch (ClassNotFoundException e) {
+ throw new SemanticException(e.getMessage());
+ }
+
+ BigTableSelectorForAutoSMJ bigTableMatcher =
+ ReflectionUtils.newInstance(bigTableMatcherClass, null);
+ JoinDesc joinDesc = joinOp.getConf();
+ JoinCondDesc[] joinCondns = joinDesc.getConds();
+ Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns);
+ if (joinCandidates.isEmpty()) {
+ // This is a full outer join. This can never be a map-join
+ // of any type. So return false.
+ return false;
+ }
+ mapJoinConversionPos =
+ bigTableMatcher.getBigTablePosition(context.parseContext, joinOp, joinCandidates);
+ if (mapJoinConversionPos < 0) {
+ // contains aliases from sub-query
+ // we are just converting to a common merge join operator. The shuffle
+ // join in map-reduce case.
+ int pos = 0; // it doesn't matter which position we use in this case.
+ convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+ return null;
+ }
+
+ if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
+ convertJoinSMBJoin(joinOp, context, mapJoinConversionPos,
+ tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true);
+ } else {
+ // we are just converting to a common merge join operator. The shuffle
+ // join in map-reduce case.
+ int pos = 0; // it doesn't matter which position we use in this case.
+ convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+ }
return null;
}
- if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
- if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos)) {
- return null;
+ if (numBuckets > 1) {
+ if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
+ if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
+ return null;
+ }
}
}
LOG.info("Convert to non-bucketed map join");
// check if we can convert to map join no bucket scaling.
- mapJoinConversionPos = mapJoinConversionPos(joinOp, context, 1);
+ mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1);
if (mapJoinConversionPos < 0) {
+ // we are just converting to a common merge join operator. The shuffle
+ // join in map-reduce case.
+ int pos = 0; // it doesn't matter which position we use in this case.
+ convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
return null;
}
MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos);
// map join operator by default has no bucket cols
- mapJoinOp.setOpTraits(new OpTraits(null, -1));
+ mapJoinOp.setOpTraits(new OpTraits(null, -1, null));
+ mapJoinOp.setStatistics(joinOp.getStatistics());
// propagate this change till the next RS
for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) {
setAllChildrenTraitsToNull(childOp);
@@ -138,11 +206,107 @@ public class ConvertJoinMapJoin implemen
return null;
}
+ // replaces the join operator with a new CommonJoinOperator, removes the
+ // parent reduce sinks
+ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+ int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren)
+ throws SemanticException {
+ ParseContext parseContext = context.parseContext;
+ MapJoinDesc mapJoinDesc = null;
+ if (adjustParentsChildren) {
+ mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
+ joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true);
+ } else {
+ JoinDesc joinDesc = joinOp.getConf();
+ // retain the original join desc in the map join.
+ mapJoinDesc =
+ new MapJoinDesc(null, null, joinDesc.getExprs(), null, null,
+ joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
+ joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
+ }
+
+ @SuppressWarnings("unchecked")
+ CommonMergeJoinOperator mergeJoinOp =
+ (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets,
+ isSubQuery, mapJoinConversionPos, mapJoinDesc));
+ OpTraits opTraits =
+ new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits()
+ .getSortCols());
+ mergeJoinOp.setOpTraits(opTraits);
+ mergeJoinOp.setStatistics(joinOp.getStatistics());
+
+ for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+ int pos = parentOp.getChildOperators().indexOf(joinOp);
+ parentOp.getChildOperators().remove(pos);
+ parentOp.getChildOperators().add(pos, mergeJoinOp);
+ }
+
+ for (Operator<? extends OperatorDesc> childOp : joinOp.getChildOperators()) {
+ int pos = childOp.getParentOperators().indexOf(joinOp);
+ childOp.getParentOperators().remove(pos);
+ childOp.getParentOperators().add(pos, mergeJoinOp);
+ }
+
+ List<Operator<? extends OperatorDesc>> childOperators = mergeJoinOp.getChildOperators();
+ if (childOperators == null) {
+ childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+ mergeJoinOp.setChildOperators(childOperators);
+ }
+
+ List<Operator<? extends OperatorDesc>> parentOperators = mergeJoinOp.getParentOperators();
+ if (parentOperators == null) {
+ parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+ mergeJoinOp.setParentOperators(parentOperators);
+ }
+
+ childOperators.clear();
+ parentOperators.clear();
+ childOperators.addAll(joinOp.getChildOperators());
+ parentOperators.addAll(joinOp.getParentOperators());
+ mergeJoinOp.getConf().setGenJoinKeys(false);
+
+ if (adjustParentsChildren) {
+ mergeJoinOp.getConf().setGenJoinKeys(true);
+ List<Operator<? extends OperatorDesc>> newParentOpList =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ for (Operator<? extends OperatorDesc> parentOp : mergeJoinOp.getParentOperators()) {
+ for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) {
+ grandParentOp.getChildOperators().remove(parentOp);
+ grandParentOp.getChildOperators().add(mergeJoinOp);
+ newParentOpList.add(grandParentOp);
+ }
+ }
+ mergeJoinOp.getParentOperators().clear();
+ mergeJoinOp.getParentOperators().addAll(newParentOpList);
+ List<Operator<? extends OperatorDesc>> parentOps =
+ new ArrayList<Operator<? extends OperatorDesc>>(mergeJoinOp.getParentOperators());
+ for (Operator<? extends OperatorDesc> parentOp : parentOps) {
+ int parentIndex = mergeJoinOp.getParentOperators().indexOf(parentOp);
+ if (parentIndex == mapJoinConversionPos) {
+ continue;
+ }
+
+ // insert the dummy store operator here
+ DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator();
+ dummyStoreOp.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+ dummyStoreOp.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+ dummyStoreOp.getChildOperators().add(mergeJoinOp);
+ int index = parentOp.getChildOperators().indexOf(mergeJoinOp);
+ parentOp.getChildOperators().remove(index);
+ parentOp.getChildOperators().add(index, dummyStoreOp);
+ dummyStoreOp.getParentOperators().add(parentOp);
+ mergeJoinOp.getParentOperators().remove(parentIndex);
+ mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp);
+ }
+ }
+ mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators());
+ }
+
private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> currentOp) {
if (currentOp instanceof ReduceSinkOperator) {
return;
}
- currentOp.setOpTraits(new OpTraits(null, -1));
+ currentOp.setOpTraits(new OpTraits(null, -1, null));
for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) {
if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) {
break;
@@ -151,28 +315,26 @@ public class ConvertJoinMapJoin implemen
}
}
- private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
- int bigTablePosition) throws SemanticException {
-
- TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
+ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+ int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
if (!checkConvertJoinBucketMapJoin(joinOp, context, bigTablePosition, tezBucketJoinProcCtx)) {
LOG.info("Check conversion to bucket map join failed.");
return false;
}
- MapJoinOperator mapJoinOp =
- convertJoinMapJoin(joinOp, context, bigTablePosition);
+ MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition);
MapJoinDesc joinDesc = mapJoinOp.getConf();
joinDesc.setBucketMapJoin(true);
// we can set the traits for this join operator
OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(),
- tezBucketJoinProcCtx.getNumBuckets());
+ tezBucketJoinProcCtx.getNumBuckets(), null);
mapJoinOp.setOpTraits(opTraits);
+ mapJoinOp.setStatistics(joinOp.getStatistics());
setNumberOfBucketsOnChildren(mapJoinOp);
- // Once the conversion is done, we can set the partitioner to bucket cols on the small table
+ // Once the conversion is done, we can set the partitioner to bucket cols on the small table
Map<String, Integer> bigTableBucketNumMapping = new HashMap<String, Integer>();
bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), tezBucketJoinProcCtx.getNumBuckets());
joinDesc.setBigTableBucketNumMapping(bigTableBucketNumMapping);
@@ -182,6 +344,54 @@ public class ConvertJoinMapJoin implemen
return true;
}
+ /*
+ * This method tries to convert a join to an SMB. This is done based on
+ * traits. If the sorted by columns are the same as the join columns then, we
+ * can convert the join to an SMB. Otherwise retain the bucket map join as it
+ * is still more efficient than a regular join.
+ */
+ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+ int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+
+ ReduceSinkOperator bigTableRS =
+ (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
+ int numBuckets = bigTableRS.getParentOperators().get(0).getOpTraits()
+ .getNumBuckets();
+
+ // the sort and bucket cols have to match on both sides for this
+ // transformation of the join operation
+ for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+ if (!(parentOp instanceof ReduceSinkOperator)) {
+ // could be mux/demux operators. Currently not supported
+ LOG.info("Found correlation optimizer operators. Cannot convert to SMB at this time.");
+ return false;
+ }
+ ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp;
+ if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp
+ .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) {
+ LOG.info("We cannot convert to SMB because the sort column names do not match.");
+ return false;
+ }
+
+ if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp
+ .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx)
+ == false) {
+ LOG.info("We cannot convert to SMB because bucket column names do not match.");
+ return false;
+ }
+ }
+
+ boolean isSubQuery = false;
+ if (numBuckets < 0) {
+ isSubQuery = true;
+ numBuckets = bigTableRS.getConf().getNumReducers();
+ }
+ tezBucketJoinProcCtx.setNumBuckets(numBuckets);
+ tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+ LOG.info("We can convert the join to an SMB join.");
+ return true;
+ }
+
private void setNumberOfBucketsOnChildren(Operator<? extends OperatorDesc> currentOp) {
int numBuckets = currentOp.getOpTraits().getNumBuckets();
for (Operator<? extends OperatorDesc>op : currentOp.getChildOperators()) {
@@ -193,15 +403,13 @@ public class ConvertJoinMapJoin implemen
}
/*
- * We perform the following checks to see if we can convert to a bucket map join
- * 1. If the parent reduce sink of the big table side has the same emit key cols as
- * its parent, we can create a bucket map join eliminating the reduce sink.
- * 2. If we have the table information, we can check the same way as in Mapreduce to
- * determine if we can perform a Bucket Map Join.
+ * If the parent reduce sink of the big table side has the same emit key cols
+ * as its parent, we can create a bucket map join eliminating the reduce sink.
*/
- private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp,
- OptimizeTezProcContext context, int bigTablePosition,
- TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+ private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp,
+ OptimizeTezProcContext context, int bigTablePosition,
+ TezBucketJoinProcCtx tezBucketJoinProcCtx)
+ throws SemanticException {
// bail on mux-operator because mux operator masks the emit keys of the
// constituent reduce sinks
if (!(joinOp.getParentOperators().get(0) instanceof ReduceSinkOperator)) {
@@ -211,14 +419,41 @@ public class ConvertJoinMapJoin implemen
}
ReduceSinkOperator rs = (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
+ List<List<String>> parentColNames = rs.getOpTraits().getBucketColNames();
+ Operator<? extends OperatorDesc> parentOfParent = rs.getParentOperators().get(0);
+ List<List<String>> grandParentColNames = parentOfParent.getOpTraits().getBucketColNames();
+ int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
+ // all keys matched.
+ if (checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(),
+ tezBucketJoinProcCtx) == false) {
+ LOG.info("No info available to check for bucket map join. Cannot convert");
+ return false;
+ }
+
/*
* this is the case when the big table is a sub-query and is probably
- * already bucketed by the join column in say a group by operation
+ * already bucketed by the join column in say a group by operation
*/
- List<List<String>> colNames = rs.getParentOperators().get(0).getOpTraits().getBucketColNames();
- if ((colNames != null) && (colNames.isEmpty() == false)) {
- Operator<? extends OperatorDesc>parentOfParent = rs.getParentOperators().get(0);
- for (List<String>listBucketCols : parentOfParent.getOpTraits().getBucketColNames()) {
+ boolean isSubQuery = false;
+ if (numBuckets < 0) {
+ isSubQuery = true;
+ numBuckets = rs.getConf().getNumReducers();
+ }
+ tezBucketJoinProcCtx.setNumBuckets(numBuckets);
+ tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+ return true;
+ }
+
+ private boolean checkColEquality(List<List<String>> grandParentColNames,
+ List<List<String>> parentColNames, Map<String, ExprNodeDesc> colExprMap,
+ TezBucketJoinProcCtx tezBucketJoinProcCtx) {
+
+ if ((grandParentColNames == null) || (parentColNames == null)) {
+ return false;
+ }
+
+ if ((parentColNames != null) && (parentColNames.isEmpty() == false)) {
+ for (List<String> listBucketCols : grandParentColNames) {
// can happen if this operator does not carry forward the previous bucketing columns
// for e.g. another join operator which does not carry one of the sides' key columns
if (listBucketCols.isEmpty()) {
@@ -226,9 +461,9 @@ public class ConvertJoinMapJoin implemen
}
int colCount = 0;
// parent op is guaranteed to have a single list because it is a reduce sink
- for (String colName : rs.getOpTraits().getBucketColNames().get(0)) {
+ for (String colName : parentColNames.get(0)) {
// all columns need to be at least a subset of the parentOfParent's bucket cols
- ExprNodeDesc exprNodeDesc = rs.getColumnExprMap().get(colName);
+ ExprNodeDesc exprNodeDesc = colExprMap.get(colName);
if (exprNodeDesc instanceof ExprNodeColumnDesc) {
if (((ExprNodeColumnDesc)exprNodeDesc).getColumn().equals(listBucketCols.get(colCount))) {
colCount++;
@@ -236,32 +471,21 @@ public class ConvertJoinMapJoin implemen
break;
}
}
-
- if (colCount == rs.getOpTraits().getBucketColNames().get(0).size()) {
- // all keys matched.
- int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
- boolean isSubQuery = false;
- if (numBuckets < 0) {
- isSubQuery = true;
- numBuckets = rs.getConf().getNumReducers();
- }
- tezBucketJoinProcCtx.setNumBuckets(numBuckets);
- tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+
+ if (colCount == parentColNames.get(0).size()) {
return true;
}
}
}
return false;
}
-
- LOG.info("No info available to check for bucket map join. Cannot convert");
return false;
}
- public int mapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
+ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
int buckets) {
- Set<Integer> bigTableCandidateSet = MapJoinProcessor.
- getBigTableCandidates(joinOp.getConf().getConds());
+ Set<Integer> bigTableCandidateSet =
+ MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds());
long maxSize = context.conf.getLongVar(
HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
@@ -287,7 +511,7 @@ public class ConvertJoinMapJoin implemen
long inputSize = currInputStat.getDataSize();
if ((bigInputStat == null) ||
((bigInputStat != null) &&
- (inputSize > bigInputStat.getDataSize()))) {
+ (inputSize > bigInputStat.getDataSize()))) {
if (bigTableFound) {
// cannot convert to map join; we've already chosen a big table
@@ -347,9 +571,9 @@ public class ConvertJoinMapJoin implemen
* for tez.
*/
- public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
int bigTablePosition) throws SemanticException {
- // bail on mux operator because currently the mux operator masks the emit keys
+ // bail on mux operator because currently the mux operator masks the emit keys
// of the constituent reduce sinks.
for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
if (parentOp instanceof MuxOperator) {
@@ -359,12 +583,12 @@ public class ConvertJoinMapJoin implemen
//can safely convert the join to a map join.
ParseContext parseContext = context.parseContext;
- MapJoinOperator mapJoinOp = MapJoinProcessor.
- convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(),
- joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
+ MapJoinOperator mapJoinOp =
+ MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp,
+ parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
- Operator<? extends OperatorDesc> parentBigTableOp
- = mapJoinOp.getParentOperators().get(bigTablePosition);
+ Operator<? extends OperatorDesc> parentBigTableOp =
+ mapJoinOp.getParentOperators().get(bigTablePosition);
if (parentBigTableOp instanceof ReduceSinkOperator) {
for (Operator<?> p : parentBigTableOp.getParentOperators()) {
// we might have generated a dynamic partition operator chain. Since
@@ -380,11 +604,10 @@ public class ConvertJoinMapJoin implemen
}
}
mapJoinOp.getParentOperators().remove(bigTablePosition);
- if (!(mapJoinOp.getParentOperators().contains(
- parentBigTableOp.getParentOperators().get(0)))) {
+ if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) {
mapJoinOp.getParentOperators().add(bigTablePosition,
parentBigTableOp.getParentOperators().get(0));
- }
+ }
parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp);
for (Operator<? extends OperatorDesc> op : mapJoinOp.getParentOperators()) {
if (!(op.getChildOperators().contains(mapJoinOp))) {
@@ -397,15 +620,31 @@ public class ConvertJoinMapJoin implemen
return mapJoinOp;
}
- private boolean hasDynamicPartitionBroadcast(Operator<?> op) {
- if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
- return true;
- }
- for (Operator<?> c : op.getChildOperators()) {
- if (hasDynamicPartitionBroadcast(c)) {
- return true;
+ private boolean hasDynamicPartitionBroadcast(Operator<?> parent) {
+ boolean hasDynamicPartitionPruning = false;
+
+ for (Operator<?> op: parent.getChildOperators()) {
+ while (op != null) {
+ if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
+ // found dynamic partition pruning operator
+ hasDynamicPartitionPruning = true;
+ break;
+ }
+
+ if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) {
+ // crossing reduce sink or file sink means the pruning isn't for this parent.
+ break;
+ }
+
+ if (op.getChildOperators().size() != 1) {
+ // dynamic partition pruning pipeline doesn't have multiple children
+ break;
+ }
+
+ op = op.getChildOperators().get(0);
}
}
- return false;
+
+ return hasDynamicPartitionPruning;
}
}
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Sun Oct 5 22:26:43 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.optimizer;
+import com.google.common.collect.Interner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -39,8 +40,6 @@ import org.apache.hadoop.hive.ql.exec.No
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
-import org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator;
-import org.apache.hadoop.hive.ql.exec.RCFileMergeOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
@@ -101,7 +100,6 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import java.io.Serializable;
@@ -580,8 +578,6 @@ public final class GenMapRedUtils {
//This read entity is a direct read entity and not an indirect read (that is when
// this is being read because it is a dependency of a view).
boolean isDirectRead = (parentViewInfo == null);
- PlanUtils.addInput(inputs,
- new ReadEntity(parseCtx.getTopToTable().get(topOp), parentViewInfo, isDirectRead));
for (Partition part : parts) {
if (part.getTable().isPartitioned()) {
@@ -882,6 +878,30 @@ public final class GenMapRedUtils {
}
}
+ public static void internTableDesc(Task<?> task, Interner<TableDesc> interner) {
+
+ if (task instanceof ConditionalTask) {
+ for (Task tsk : ((ConditionalTask) task).getListTasks()) {
+ internTableDesc(tsk, interner);
+ }
+ } else if (task instanceof ExecDriver) {
+ MapredWork work = (MapredWork) task.getWork();
+ work.getMapWork().internTable(interner);
+ } else if (task != null && (task.getWork() instanceof TezWork)) {
+ TezWork work = (TezWork)task.getWork();
+ for (BaseWork w : work.getAllWorkUnsorted()) {
+ if (w instanceof MapWork) {
+ ((MapWork)w).internTable(interner);
+ }
+ }
+ }
+ if (task.getNumChild() > 0) {
+ for (Task childTask : task.getChildTasks()) {
+ internTableDesc(childTask, interner);
+ }
+ }
+ }
+
/**
* create a new plan and return.
*
@@ -1507,7 +1527,7 @@ public final class GenMapRedUtils {
*
* @param fsInputDesc
* @param finalName
- * @param inputFormatClass
+ * @param inputFormatClass
* @return MergeWork if table is stored as RCFile or ORCFile,
* null otherwise
*/
@@ -1714,7 +1734,7 @@ public final class GenMapRedUtils {
// There are separate configuration parameters to control whether to
// merge for a map-only job
// or for a map-reduce job
- if (currTask.getWork() instanceof MapredWork) {
+ if (currTask.getWork() instanceof MapredWork) {
ReduceWork reduceWork = ((MapredWork) currTask.getWork()).getReduceWork();
boolean mergeMapOnly =
hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null;
@@ -1813,7 +1833,7 @@ public final class GenMapRedUtils {
return Collections.emptyList();
}
- public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey)
+ public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey)
throws SemanticException {
List<Path> inputPaths = new ArrayList<Path>();
switch (parseInfo.getTableSpec().specType) {
@@ -1850,6 +1870,7 @@ public final class GenMapRedUtils {
public static Set<Operator<?>> findTopOps(Operator<?> startOp, final Class<?> clazz) {
final Set<Operator<?>> operators = new LinkedHashSet<Operator<?>>();
OperatorUtils.iterateParents(startOp, new NodeUtils.Function<Operator<?>>() {
+ @Override
public void apply(Operator<?> argument) {
if (argument.getNumParent() == 0 && (clazz == null || clazz.isInstance(argument))) {
operators.add(argument);
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Sun Oct 5 22:26:43 2014
@@ -389,157 +389,8 @@ public class MapJoinProcessor implements
JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
throws SemanticException {
- JoinDesc desc = op.getConf();
- JoinCondDesc[] condns = desc.getConds();
- Byte[] tagOrder = desc.getTagOrder();
-
- // outer join cannot be performed on a table which is being cached
- if (!noCheckOuterJoin) {
- if (checkMapJoin(mapJoinPos, condns) < 0) {
- throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
- }
- }
-
- // Walk over all the sources (which are guaranteed to be reduce sink
- // operators).
- // The join outputs a concatenation of all the inputs.
- QBJoinTree leftSrc = joinTree.getJoinSrc();
- List<ReduceSinkOperator> oldReduceSinkParentOps =
- new ArrayList<ReduceSinkOperator>(op.getNumParent());
- if (leftSrc != null) {
- // assert mapJoinPos == 0;
- Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
- assert parentOp.getParentOperators().size() == 1;
- oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
- }
-
-
- byte pos = 0;
- for (String src : joinTree.getBaseSrc()) {
- if (src != null) {
- Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
- assert parentOp.getParentOperators().size() == 1;
- oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
- }
- pos++;
- }
-
- Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
- List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
- Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
- Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
- for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
- byte tag = entry.getKey();
- Operator<?> terminal = oldReduceSinkParentOps.get(tag);
-
- List<ExprNodeDesc> values = entry.getValue();
- List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal);
- newValueExprs.put(tag, newValues);
- for (int i = 0; i < schema.size(); i++) {
- ColumnInfo column = schema.get(i);
- if (column == null) {
- continue;
- }
- ExprNodeDesc expr = colExprMap.get(column.getInternalName());
- int index = ExprNodeDescUtils.indexOf(expr, values);
- if (index >= 0) {
- colExprMap.put(column.getInternalName(), newValues.get(index));
- schema.set(i, null);
- }
- }
- }
-
- // rewrite value index for mapjoin
- Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
-
- // get the join keys from old parent ReduceSink operators
- Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
-
- // construct valueTableDescs and valueFilteredTableDescs
- List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
- List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
- int[][] filterMap = desc.getFilterMap();
- for (pos = 0; pos < op.getParentOperators().size(); pos++) {
- ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
- List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
- List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
- if (pos != mapJoinPos) {
- // remove values in key exprs for value table schema
- // value expression for hashsink will be modified in LocalMapJoinProcessor
- int[] valueIndex = new int[valueCols.size()];
- List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
- for (int i = 0; i < valueIndex.length; i++) {
- ExprNodeDesc expr = valueCols.get(i);
- int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
- if (kindex >= 0) {
- valueIndex[i] = kindex;
- } else {
- valueIndex[i] = -valueColsInValueExpr.size() - 1;
- valueColsInValueExpr.add(expr);
- }
- }
- if (needValueIndex(valueIndex)) {
- valueIndices.put(pos, valueIndex);
- }
- valueCols = valueColsInValueExpr;
- }
- // deep copy expr node desc
- List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
- if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
- ExprNodeColumnDesc isFilterDesc = new ExprNodeColumnDesc(TypeInfoFactory
- .getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", "filter", false);
- valueFilteredCols.add(isFilterDesc);
- }
-
- TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
- .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
- TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
- .getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue"));
-
- valueTableDescs.add(valueTableDesc);
- valueFilteredTableDescs.add(valueFilteredTableDesc);
-
- keyExprMap.put(pos, keyCols);
- }
-
- Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
- Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
- for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
- byte srcTag = entry.getKey();
- List<ExprNodeDesc> filter = entry.getValue();
-
- Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
- newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
- }
- desc.setFilters(filters = newFilters);
-
- // create dumpfile prefix needed to create descriptor
- String dumpFilePrefix = "";
- if( joinTree.getMapAliases() != null ) {
- for(String mapAlias : joinTree.getMapAliases()) {
- dumpFilePrefix = dumpFilePrefix + mapAlias;
- }
- dumpFilePrefix = dumpFilePrefix+"-"+PlanUtils.getCountForMapJoinDumpFilePrefix();
- } else {
- dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix();
- }
-
- List<ExprNodeDesc> keyCols = keyExprMap.get((byte)mapJoinPos);
-
- List<String> outputColumnNames = op.getConf().getOutputColumnNames();
- TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf,
- PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
- JoinCondDesc[] joinCondns = op.getConf().getConds();
- MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs,
- valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns,
- filters, op.getConf().getNoOuterJoin(), dumpFilePrefix);
- mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
- mapJoinDescriptor.setTagOrder(tagOrder);
- mapJoinDescriptor.setNullSafes(desc.getNullSafes());
- mapJoinDescriptor.setFilterMap(desc.getFilterMap());
- if (!valueIndices.isEmpty()) {
- mapJoinDescriptor.setValueIndices(valueIndices);
- }
+ MapJoinDesc mapJoinDescriptor =
+ getMapJoinDesc(hconf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin);
// reduce sink row resolver used to generate map join op
RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
@@ -551,6 +402,7 @@ public class MapJoinProcessor implements
opParseCtxMap.put(mapJoinOp, ctx);
mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs());
+ Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
mapJoinOp.setColumnExprMap(colExprMap);
List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators();
@@ -1176,4 +1028,168 @@ public class MapJoinProcessor implements
}
}
+
+ public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
+ LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
+ JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
+ JoinDesc desc = op.getConf();
+ JoinCondDesc[] condns = desc.getConds();
+ Byte[] tagOrder = desc.getTagOrder();
+
+ // outer join cannot be performed on a table which is being cached
+ if (!noCheckOuterJoin) {
+ if (checkMapJoin(mapJoinPos, condns) < 0) {
+ throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+ }
+ }
+
+ // Walk over all the sources (which are guaranteed to be reduce sink
+ // operators).
+ // The join outputs a concatenation of all the inputs.
+ QBJoinTree leftSrc = joinTree.getJoinSrc();
+ List<ReduceSinkOperator> oldReduceSinkParentOps =
+ new ArrayList<ReduceSinkOperator>(op.getNumParent());
+ if (leftSrc != null) {
+ // assert mapJoinPos == 0;
+ Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
+ assert parentOp.getParentOperators().size() == 1;
+ oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
+ }
+
+ byte pos = 0;
+ for (String src : joinTree.getBaseSrc()) {
+ if (src != null) {
+ Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
+ assert parentOp.getParentOperators().size() == 1;
+ oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
+ }
+ pos++;
+ }
+
+ Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
+ List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
+ Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
+ Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
+ for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
+ byte tag = entry.getKey();
+ Operator<?> terminal = oldReduceSinkParentOps.get(tag);
+
+ List<ExprNodeDesc> values = entry.getValue();
+ List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal);
+ newValueExprs.put(tag, newValues);
+ for (int i = 0; i < schema.size(); i++) {
+ ColumnInfo column = schema.get(i);
+ if (column == null) {
+ continue;
+ }
+ ExprNodeDesc expr = colExprMap.get(column.getInternalName());
+ int index = ExprNodeDescUtils.indexOf(expr, values);
+ if (index >= 0) {
+ colExprMap.put(column.getInternalName(), newValues.get(index));
+ schema.set(i, null);
+ }
+ }
+ }
+
+ // rewrite value index for mapjoin
+ Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
+
+ // get the join keys from old parent ReduceSink operators
+ Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
+
+ // construct valueTableDescs and valueFilteredTableDescs
+ List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
+ List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
+ int[][] filterMap = desc.getFilterMap();
+ for (pos = 0; pos < op.getParentOperators().size(); pos++) {
+ ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
+ List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
+ List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
+ if (pos != mapJoinPos) {
+ // remove values in key exprs for value table schema
+ // value expression for hashsink will be modified in
+ // LocalMapJoinProcessor
+ int[] valueIndex = new int[valueCols.size()];
+ List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
+ for (int i = 0; i < valueIndex.length; i++) {
+ ExprNodeDesc expr = valueCols.get(i);
+ int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
+ if (kindex >= 0) {
+ valueIndex[i] = kindex;
+ } else {
+ valueIndex[i] = -valueColsInValueExpr.size() - 1;
+ valueColsInValueExpr.add(expr);
+ }
+ }
+ if (needValueIndex(valueIndex)) {
+ valueIndices.put(pos, valueIndex);
+ }
+ valueCols = valueColsInValueExpr;
+ }
+ // deep copy expr node desc
+ List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
+ if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
+ ExprNodeColumnDesc isFilterDesc =
+ new ExprNodeColumnDesc(
+ TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter",
+ "filter", false);
+ valueFilteredCols.add(isFilterDesc);
+ }
+
+ TableDesc valueTableDesc =
+ PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols,
+ "mapjoinvalue"));
+ TableDesc valueFilteredTableDesc =
+ PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(
+ valueFilteredCols, "mapjoinvalue"));
+
+ valueTableDescs.add(valueTableDesc);
+ valueFilteredTableDescs.add(valueFilteredTableDesc);
+
+ keyExprMap.put(pos, keyCols);
+ }
+
+ Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
+ Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
+ for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
+ byte srcTag = entry.getKey();
+ List<ExprNodeDesc> filter = entry.getValue();
+
+ Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
+ newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
+ }
+ desc.setFilters(filters = newFilters);
+
+ // create dumpfile prefix needed to create descriptor
+ String dumpFilePrefix = "";
+ if (joinTree.getMapAliases() != null) {
+ for (String mapAlias : joinTree.getMapAliases()) {
+ dumpFilePrefix = dumpFilePrefix + mapAlias;
+ }
+ dumpFilePrefix = dumpFilePrefix + "-" + PlanUtils.getCountForMapJoinDumpFilePrefix();
+ } else {
+ dumpFilePrefix = "mapfile" + PlanUtils.getCountForMapJoinDumpFilePrefix();
+ }
+
+ List<ExprNodeDesc> keyCols = keyExprMap.get((byte) mapJoinPos);
+
+ List<String> outputColumnNames = op.getConf().getOutputColumnNames();
+ TableDesc keyTableDesc =
+ PlanUtils.getMapJoinKeyTableDesc(hconf,
+ PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
+ JoinCondDesc[] joinCondns = op.getConf().getConds();
+ MapJoinDesc mapJoinDescriptor =
+ new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs,
+ valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op
+ .getConf().getNoOuterJoin(), dumpFilePrefix);
+ mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
+ mapJoinDescriptor.setTagOrder(tagOrder);
+ mapJoinDescriptor.setNullSafes(desc.getNullSafes());
+ mapJoinDescriptor.setFilterMap(desc.getFilterMap());
+ if (!valueIndices.isEmpty()) {
+ mapJoinDescriptor.setValueIndices(valueIndices);
+ }
+
+ return mapJoinDescriptor;
+ }
}
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Sun Oct 5 22:26:43 2014
@@ -51,7 +51,12 @@ public class Optimizer {
* @param hiveConf
*/
public void initialize(HiveConf hiveConf) {
+
+ boolean isTezExecEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
+ boolean bucketMapJoinOptimizer = false;
+
transformations = new ArrayList<Transform>();
+
// Add the transformation that computes the lineage information.
transformations.add(new Generator());
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
@@ -81,15 +86,16 @@ public class Optimizer {
}
transformations.add(new SamplePruner());
transformations.add(new MapJoinProcessor());
- boolean bucketMapJoinOptimizer = false;
- if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) {
+
+ if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) && !isTezExecEngine) {
transformations.add(new BucketMapJoinOptimizer());
bucketMapJoinOptimizer = true;
}
// If optimize hive.optimize.bucketmapjoin.sortedmerge is set, add both
// BucketMapJoinOptimizer and SortedMergeBucketMapJoinOptimizer
- if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
+ if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN))
+ && !isTezExecEngine) {
if (!bucketMapJoinOptimizer) {
// No need to add BucketMapJoinOptimizer twice
transformations.add(new BucketMapJoinOptimizer());
@@ -119,7 +125,7 @@ public class Optimizer {
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME) &&
- !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ !isTezExecEngine) {
transformations.add(new CorrelationOptimizer());
}
if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) {
@@ -128,8 +134,7 @@ public class Optimizer {
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) {
transformations.add(new StatsOptimizer());
}
- String execEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
- if ((pctx.getContext().getExplain() || "spark".equals(execEngine)) && !"tez".equals(execEngine)) {
+ if (pctx.getContext().getExplain() && !isTezExecEngine) {
transformations.add(new AnnotateWithStatistics());
transformations.add(new AnnotateWithOpTraits());
}
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Sun Oct 5 22:26:43 2014
@@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
public class ReduceSinkMapJoinProc implements NodeProcessor {
@@ -183,7 +184,10 @@ public class ReduceSinkMapJoinProc imple
TezWork tezWork = context.currentTask.getWork();
LOG.debug("connecting "+parentWork.getName()+" with "+myWork.getName());
tezWork.connect(parentWork, myWork, edgeProp);
-
+ if (edgeType == EdgeType.CUSTOM_EDGE) {
+ tezWork.setVertexType(myWork, VertexType.INITIALIZED_EDGES);
+ }
+
ReduceSinkOperator r = null;
if (parentRS.getConf().getOutputName() != null) {
LOG.debug("Cloning reduce sink for multi-child broadcast edge");
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java Sun Oct 5 22:26:43 2014
@@ -44,9 +44,9 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
-import org.apache.hadoop.hive.ql.metadata.InputEstimator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -55,13 +55,25 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.QB;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.SplitSample;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToChar;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDate;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
@@ -73,9 +85,11 @@ public class SimpleFetchOptimizer implem
private final Log LOG = LogFactory.getLog(SimpleFetchOptimizer.class.getName());
+ @Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
Map<String, Operator<? extends OperatorDesc>> topOps = pctx.getTopOps();
- if (pctx.getQB().isSimpleSelectQuery() && topOps.size() == 1) {
+ if (pctx.getQB().getIsQuery() && !pctx.getQB().getParseInfo().isAnalyzeCommand()
+ && topOps.size() == 1) {
// no join, no groupby, no distinct, no lateral view, no subq,
// no CTAS or insert, not analyze command, and single sourced.
String alias = (String) pctx.getTopOps().keySet().toArray()[0];
@@ -144,7 +158,7 @@ public class SimpleFetchOptimizer implem
// for non-aggressive mode (minimal)
// 1. samping is not allowed
// 2. for partitioned table, all filters should be targeted to partition column
- // 3. SelectOperator should be select star
+ // 3. SelectOperator should use only simple cast/column access
private FetchData checkTree(boolean aggressive, ParseContext pctx, String alias,
TableScanOperator ts) throws HiveException {
SplitSample splitSample = pctx.getNameToSplitSample().get(alias);
@@ -156,7 +170,7 @@ public class SimpleFetchOptimizer implem
return null;
}
- Table table = qb.getMetaData().getAliasToTable().get(alias);
+ Table table = pctx.getTopToTable().get(ts);
if (table == null) {
return null;
}
@@ -181,34 +195,71 @@ public class SimpleFetchOptimizer implem
return null;
}
- private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggresive,
+ private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggressive,
boolean bypassFilter) {
if (ts.getChildOperators().size() != 1) {
return null;
}
Operator<?> op = ts.getChildOperators().get(0);
for (; ; op = op.getChildOperators().get(0)) {
- if (aggresive) {
- if (!(op instanceof LimitOperator || op instanceof FilterOperator
- || op instanceof SelectOperator)) {
+ if (op instanceof SelectOperator) {
+ if (!aggressive) {
+ if (!checkExpressions((SelectOperator) op)) {
+ break;
+ }
+ }
+ continue;
+ }
+
+ if (aggressive) {
+ if (!(op instanceof LimitOperator || op instanceof FilterOperator)) {
break;
}
- } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter)
- || (op instanceof SelectOperator && ((SelectOperator) op).getConf().isSelectStar()))) {
+ } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter))) {
break;
}
+
if (op.getChildOperators() == null || op.getChildOperators().size() != 1) {
return null;
}
}
+
if (op instanceof FileSinkOperator) {
fetch.scanOp = ts;
fetch.fileSink = op;
return fetch;
}
+
return null;
}
+ private boolean checkExpressions(SelectOperator op) {
+ SelectDesc desc = op.getConf();
+ for (ExprNodeDesc expr : desc.getColList()) {
+ if (!checkExpression(expr)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean checkExpression(ExprNodeDesc expr) {
+ if (expr instanceof ExprNodeConstantDesc || expr instanceof ExprNodeColumnDesc) {
+ return true;
+ }
+
+ if (expr instanceof ExprNodeGenericFuncDesc) {
+ GenericUDF udf = ((ExprNodeGenericFuncDesc) expr).getGenericUDF();
+ if (udf instanceof GenericUDFToBinary || udf instanceof GenericUDFToChar
+ || udf instanceof GenericUDFToDate || udf instanceof GenericUDFToDecimal
+ || udf instanceof GenericUDFToUnixTimeStamp || udf instanceof GenericUDFToUtcTimestamp
+ || udf instanceof GenericUDFToVarchar) {
+ return expr.getChildren().size() == 1 && checkExpression(expr.getChildren().get(0));
+ }
+ }
+ return false;
+ }
+
private class FetchData {
private final ReadEntity parent;
@@ -240,7 +291,7 @@ public class SimpleFetchOptimizer implem
this.splitSample = splitSample;
this.onlyPruningFilter = bypassFilter;
}
-
+
/*
* all filters were executed during partition pruning
*/
@@ -251,7 +302,7 @@ public class SimpleFetchOptimizer implem
private FetchWork convertToWork() throws HiveException {
inputs.clear();
if (!table.isPartitioned()) {
- inputs.add(new ReadEntity(table, parent));
+ inputs.add(new ReadEntity(table, parent, parent == null));
FetchWork work = new FetchWork(table.getPath(), Utilities.getTableDesc(table));
PlanUtils.configureInputJobPropertiesForStorageHandler(work.getTblDesc());
work.setSplitSample(splitSample);
@@ -261,12 +312,12 @@ public class SimpleFetchOptimizer implem
List<PartitionDesc> partP = new ArrayList<PartitionDesc>();
for (Partition partition : partsList.getNotDeniedPartns()) {
- inputs.add(new ReadEntity(partition, parent));
+ inputs.add(new ReadEntity(partition, parent, parent == null));
listP.add(partition.getDataLocation());
partP.add(Utilities.getPartitionDesc(partition));
}
Table sourceTable = partsList.getSourceTable();
- inputs.add(new ReadEntity(sourceTable, parent));
+ inputs.add(new ReadEntity(sourceTable, parent, parent == null));
TableDesc table = Utilities.getTableDesc(sourceTable);
FetchWork work = new FetchWork(listP, partP, table);
if (!work.getPartDesc().isEmpty()) {