You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/26 21:16:13 UTC
svn commit: r1476348 [9/29] - in /hive/branches/vectorization: ./ beeline/
beeline/src/java/org/apache/hive/beeline/ beeline/src/test/org/
beeline/src/test/org/apache/ beeline/src/test/org/apache/hive/
beeline/src/test/org/apache/hive/beeline/ beeline/...
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java Fri Apr 26 19:14:49 2013
@@ -38,7 +38,7 @@ public class RCFileKeyBufferWrapper impl
protected CompressionCodec codec;
- protected RCFileKeyBufferWrapper() {
+ public RCFileKeyBufferWrapper() {
}
public static RCFileKeyBufferWrapper create(KeyBuffer currentKeyBufferObj) {
@@ -66,4 +66,48 @@ public class RCFileKeyBufferWrapper impl
return keyBuffer;
}
+ public void setKeyBuffer(KeyBuffer keyBuffer) {
+ this.keyBuffer = keyBuffer;
+ }
+
+ public int getRecordLength() {
+ return recordLength;
+ }
+
+ public void setRecordLength(int recordLength) {
+ this.recordLength = recordLength;
+ }
+
+ public int getKeyLength() {
+ return keyLength;
+ }
+
+ public void setKeyLength(int keyLength) {
+ this.keyLength = keyLength;
+ }
+
+ public int getCompressedKeyLength() {
+ return compressedKeyLength;
+ }
+
+ public void setCompressedKeyLength(int compressedKeyLength) {
+ this.compressedKeyLength = compressedKeyLength;
+ }
+
+ public Path getInputPath() {
+ return inputPath;
+ }
+
+ public void setInputPath(Path inputPath) {
+ this.inputPath = inputPath;
+ }
+
+ public CompressionCodec getCodec() {
+ return codec;
+ }
+
+ public void setCodec(CompressionCodec codec) {
+ this.codec = codec;
+ }
+
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java Fri Apr 26 19:14:49 2013
@@ -48,4 +48,12 @@ public class RCFileValueBufferWrapper im
return this.valueBuffer.compareTo(o.valueBuffer);
}
+ public ValueBuffer getValueBuffer() {
+ return valueBuffer;
+ }
+
+ public void setValueBuffer(ValueBuffer valueBuffer) {
+ this.valueBuffer = valueBuffer;
+ }
+
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java Fri Apr 26 19:14:49 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.laz
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -87,6 +88,11 @@ public class DefaultStorageHandler imple
}
@Override
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+ //do nothing by default
+ }
+
+ @Override
public Configuration getConf() {
return conf;
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Fri Apr 26 19:14:49 2013
@@ -76,6 +76,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.Role;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
@@ -1220,8 +1221,8 @@ public class Hive {
org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition();
SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo();
/* Construct list bucketing location mappings from sub-directory name. */
- Map<List<String>, String> skewedColValueLocationMaps = constructListBucketingLocationMap(
- newPartPath, skewedInfo);
+ Map<SkewedValueList, String> skewedColValueLocationMaps =
+ constructListBucketingLocationMap(newPartPath, skewedInfo);
/* Add list bucketing location mappings. */
skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
@@ -1254,7 +1255,8 @@ public class Hive {
* @throws IOException
*/
private void walkDirTree(FileStatus fSta, FileSystem fSys,
- Map<List<String>, String> skewedColValueLocationMaps, Path newPartPath, SkewedInfo skewedInfo)
+ Map<SkewedValueList, String> skewedColValueLocationMaps,
+ Path newPartPath, SkewedInfo skewedInfo)
throws IOException {
/* Base Case. It's leaf. */
if (!fSta.isDir()) {
@@ -1280,7 +1282,7 @@ private void walkDirTree(FileStatus fSta
* @param skewedInfo
*/
private void constructOneLBLocationMap(FileStatus fSta,
- Map<List<String>, String> skewedColValueLocationMaps,
+ Map<SkewedValueList, String> skewedColValueLocationMaps,
Path newPartPath, SkewedInfo skewedInfo) {
Path lbdPath = fSta.getPath().getParent();
List<String> skewedValue = new ArrayList<String>();
@@ -1303,7 +1305,7 @@ private void constructOneLBLocationMap(F
}
if ((skewedValue.size() > 0) && (skewedValue.size() == skewedInfo.getSkewedColNames().size())
&& !skewedColValueLocationMaps.containsKey(skewedValue)) {
- skewedColValueLocationMaps.put(skewedValue, lbdPath.toString());
+ skewedColValueLocationMaps.put(new SkewedValueList(skewedValue), lbdPath.toString());
}
}
@@ -1316,9 +1318,10 @@ private void constructOneLBLocationMap(F
* @throws IOException
* @throws FileNotFoundException
*/
- private Map<List<String>, String> constructListBucketingLocationMap(Path newPartPath,
+ private Map<SkewedValueList, String> constructListBucketingLocationMap(Path newPartPath,
SkewedInfo skewedInfo) throws IOException, FileNotFoundException {
- Map<List<String>, String> skewedColValueLocationMaps = new HashMap<List<String>, String>();
+ Map<SkewedValueList, String> skewedColValueLocationMaps =
+ new HashMap<SkewedValueList, String>();
FileSystem fSys = newPartPath.getFileSystem(conf);
walkDirTree(fSys.getFileStatus(newPartPath), fSys, skewedColValueLocationMaps, newPartPath,
skewedInfo);
@@ -1668,7 +1671,7 @@ private void constructOneLBLocationMap(F
List<String> names = null;
Table t = getTable(dbName, tblName);
- List<String> pvals = getPvals(t.getPartCols(), partSpec);
+ List<String> pvals = MetaStoreUtils.getPvals(t.getPartCols(), partSpec);
try {
names = getMSC().listPartitionNames(dbName, tblName, pvals, max);
@@ -1710,19 +1713,6 @@ private void constructOneLBLocationMap(F
}
}
- public static List<String> getPvals(List<FieldSchema> partCols,
- Map<String, String> partSpec) {
- List<String> pvals = new ArrayList<String>();
- for (FieldSchema field : partCols) {
- String val = partSpec.get(field.getName());
- if (val == null) {
- val = "";
- }
- pvals.add(val);
- }
- return pvals;
- }
-
/**
* get all the partitions of the table that matches the given partial
* specification. partition columns whose value is can be anything should be
@@ -1742,7 +1732,7 @@ private void constructOneLBLocationMap(F
"partitioned table");
}
- List<String> partialPvals = getPvals(tbl.getPartCols(), partialPartSpec);
+ List<String> partialPvals = MetaStoreUtils.getPvals(tbl.getPartCols(), partialPartSpec);
List<org.apache.hadoop.hive.metastore.api.Partition> partitions = null;
try {
@@ -2248,6 +2238,18 @@ private void constructOneLBLocationMap(F
}
}
+ public void exchangeTablePartitions(Map<String, String> partitionSpecs,
+ String sourceDb, String sourceTable, String destDb,
+ String destinationTableName) throws HiveException {
+ try {
+ getMSC().exchange_partition(partitionSpecs, sourceDb, sourceTable, destDb,
+ destinationTableName);
+ } catch (Exception ex) {
+ LOG.error(StringUtils.stringifyException(ex));
+ throw new HiveException(ex);
+ }
+ }
+
/**
* Creates a metastore client. Currently it creates only JDBC based client as
* File based store support is removed
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java Fri Apr 26 19:14:49 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
/**
@@ -133,4 +134,12 @@ public interface HiveStorageHandler exte
public void configureTableJobProperties(
TableDesc tableDesc,
Map<String, String> jobProperties);
+
+ /**
+ * Called just before submitting MapReduce job.
+ *
+ * @param tableDesc descriptor for the table being accessed
+ * @param JobConf jobConf for MapReduce job
+ */
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf);
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Fri Apr 26 19:14:49 2013
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -647,18 +648,18 @@ public class Partition implements Serial
public void setSkewedValueLocationMap(List<String> valList, String dirName)
throws HiveException {
- Map<List<String>, String> mappings = tPartition.getSd().getSkewedInfo()
+ Map<SkewedValueList, String> mappings = tPartition.getSd().getSkewedInfo()
.getSkewedColValueLocationMaps();
if (null == mappings) {
- mappings = new HashMap<List<String>, String>();
+ mappings = new HashMap<SkewedValueList, String>();
tPartition.getSd().getSkewedInfo().setSkewedColValueLocationMaps(mappings);
}
// Add or update new mapping
- mappings.put(valList, dirName);
+ mappings.put(new SkewedValueList(valList), dirName);
}
- public Map<List<String>, String> getSkewedColValueLocationMaps() {
+ public Map<SkewedValueList, String> getSkewedColValueLocationMaps() {
return tPartition.getSd().getSkewedInfo().getSkewedColValueLocationMaps();
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Fri Apr 26 19:14:49 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -144,7 +145,7 @@ public class Table implements Serializab
SkewedInfo skewInfo = new SkewedInfo();
skewInfo.setSkewedColNames(new ArrayList<String>());
skewInfo.setSkewedColValues(new ArrayList<List<String>>());
- skewInfo.setSkewedColValueLocationMaps(new HashMap<List<String>, String>());
+ skewInfo.setSkewedColValueLocationMaps(new HashMap<SkewedValueList, String>());
sd.setSkewedInfo(skewInfo);
}
@@ -518,20 +519,20 @@ public class Table implements Serializab
public void setSkewedValueLocationMap(List<String> valList, String dirName)
throws HiveException {
- Map<List<String>, String> mappings = tTable.getSd().getSkewedInfo()
+ Map<SkewedValueList, String> mappings = tTable.getSd().getSkewedInfo()
.getSkewedColValueLocationMaps();
if (null == mappings) {
- mappings = new HashMap<List<String>, String>();
+ mappings = new HashMap<SkewedValueList, String>();
tTable.getSd().getSkewedInfo().setSkewedColValueLocationMaps(mappings);
}
// Add or update new mapping
- mappings.put(valList, dirName);
+ mappings.put(new SkewedValueList(valList), dirName);
}
- public Map<List<String>,String> getSkewedColValueLocationMaps() {
+ public Map<SkewedValueList,String> getSkewedColValueLocationMaps() {
return (tTable.getSd().getSkewedInfo() != null) ? tTable.getSd().getSkewedInfo()
- .getSkewedColValueLocationMaps() : new HashMap<List<String>, String>();
+ .getSkewedColValueLocationMaps() : new HashMap<SkewedValueList, String>();
}
public void setSkewedColValues(List<List<String>> skewedValues) throws HiveException {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java Fri Apr 26 19:14:49 2013
@@ -31,6 +31,7 @@ import org.apache.commons.lang.StringEsc
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.index.HiveIndex;
import org.apache.hadoop.hive.ql.index.HiveIndex.IndexType;
@@ -62,16 +63,22 @@ public final class MetaDataFormatUtils {
columnInformation.append(LINE_DELIM);
}
- public static String getAllColumnsInformation(List<FieldSchema> cols) {
+ public static String getAllColumnsInformation(List<FieldSchema> cols,
+ boolean printHeader) {
StringBuilder columnInformation = new StringBuilder(DEFAULT_STRINGBUILDER_SIZE);
- formatColumnsHeader(columnInformation);
+ if(printHeader){
+ formatColumnsHeader(columnInformation);
+ }
formatAllFields(columnInformation, cols);
return columnInformation.toString();
}
- public static String getAllColumnsInformation(List<FieldSchema> cols, List<FieldSchema> partCols) {
+ public static String getAllColumnsInformation(List<FieldSchema> cols, List<FieldSchema> partCols,
+ boolean printHeader) {
StringBuilder columnInformation = new StringBuilder(DEFAULT_STRINGBUILDER_SIZE);
- formatColumnsHeader(columnInformation);
+ if(printHeader){
+ formatColumnsHeader(columnInformation);
+ }
formatAllFields(columnInformation, cols);
if ((partCols != null) && (!partCols.isEmpty())) {
@@ -193,7 +200,7 @@ public final class MetaDataFormatUtils {
formatOutput("Skewed Values:", skewedColValues.toString(), tableInfo);
}
- Map<List<String>, String> skewedColMap = storageDesc.getSkewedInfo()
+ Map<SkewedValueList, String> skewedColMap = storageDesc.getSkewedInfo()
.getSkewedColValueLocationMaps();
if ((skewedColMap!=null) && (skewedColMap.size() > 0)) {
formatOutput("Skewed Value to Path:", skewedColMap.toString(),
@@ -201,9 +208,8 @@ public final class MetaDataFormatUtils {
Map<List<String>, String> truncatedSkewedColMap = new HashMap<List<String>, String>();
// walk through existing map to truncate path so that test won't mask it
// then we can verify location is right
- Set<Entry<List<String>, String>> entries = skewedColMap.entrySet();
- for (Entry<List<String>, String> entry : entries) {
- truncatedSkewedColMap.put(entry.getKey(),
+ for (Entry<SkewedValueList, String> entry : skewedColMap.entrySet()) {
+ truncatedSkewedColMap.put(entry.getKey().getSkewedValueList(),
PlanUtils.removePrefixFromWarehouseConfig(entry.getValue()));
}
formatOutput("Skewed Value to Truncated Path:",
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java Fri Apr 26 19:14:49 2013
@@ -19,13 +19,14 @@
package org.apache.hadoop.hive.ql.metadata.formatting;
import java.io.DataOutputStream;
-import java.io.OutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
@@ -143,10 +144,11 @@ public class TextMetaDataFormatter imple
MetaDataPrettyFormatUtils.getAllColumnsInformation(
cols, partCols, prettyOutputNumCols)
:
- MetaDataFormatUtils.getAllColumnsInformation(cols, partCols)
+ MetaDataFormatUtils.getAllColumnsInformation(cols, partCols, isFormatted)
);
} else {
- outStream.writeBytes(MetaDataFormatUtils.getAllColumnsInformation(cols));
+ outStream.writeBytes(
+ MetaDataFormatUtils.getAllColumnsInformation(cols, isFormatted));
}
if (tableName.equals(colPath)) {
@@ -455,11 +457,13 @@ public class TextMetaDataFormatter imple
try {
outStream.writeBytes(database);
outStream.write(separator);
- if (comment != null)
- outStream.writeBytes(comment);
+ if (comment != null) {
+ outStream.writeBytes(comment);
+ }
outStream.write(separator);
- if (location != null)
- outStream.writeBytes(location);
+ if (location != null) {
+ outStream.writeBytes(location);
+ }
outStream.write(separator);
if (params != null && !params.isEmpty()) {
outStream.writeBytes(params.toString());
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Fri Apr 26 19:14:49 2013
@@ -390,11 +390,10 @@ abstract public class AbstractSMBJoinPro
// Can the join operator be converted to a sort-merge join operator ?
// It is already verified that the join can be converted to a bucket map join
protected boolean checkConvertJoinToSMBJoin(
- JoinOperator joinOperator,
- SortBucketJoinProcCtx smbJoinContext,
- ParseContext pGraphContext) throws SemanticException {
+ JoinOperator joinOperator,
+ SortBucketJoinProcCtx smbJoinContext,
+ ParseContext pGraphContext) throws SemanticException {
- boolean tableEligibleForBucketedSortMergeJoin = true;
QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOperator);
if (joinCtx == null) {
@@ -409,14 +408,15 @@ abstract public class AbstractSMBJoinPro
List<Order> sortColumnsFirstTable = new ArrayList<Order>();
for (int pos = 0; pos < srcs.length; pos++) {
- tableEligibleForBucketedSortMergeJoin = tableEligibleForBucketedSortMergeJoin &&
- isEligibleForBucketSortMergeJoin(smbJoinContext,
- pGraphContext,
- smbJoinContext.getKeyExprMap().get((byte)pos),
- joinCtx,
- srcs,
- pos,
- sortColumnsFirstTable);
+ if (!isEligibleForBucketSortMergeJoin(smbJoinContext,
+ pGraphContext,
+ smbJoinContext.getKeyExprMap().get((byte) pos),
+ joinCtx,
+ srcs,
+ pos,
+ sortColumnsFirstTable)) {
+ return false;
+ }
}
smbJoinContext.setSrcs(srcs);
@@ -472,6 +472,10 @@ abstract public class AbstractSMBJoinPro
(BigTableSelectorForAutoSMJ) ReflectionUtils.newInstance(bigTableMatcherClass, null);
int bigTablePosition =
bigTableMatcher.getBigTablePosition(pGraphContext, joinOp);
+ if (bigTablePosition < 0) {
+ // contains aliases from sub-query
+ return false;
+ }
context.setBigTablePosition(bigTablePosition);
String joinAlias =
bigTablePosition == 0 ?
@@ -489,9 +493,12 @@ abstract public class AbstractSMBJoinPro
}
context.setKeyExprMap(keyExprMap);
- String[] srcs = joinCtx.getBaseSrc();
- for (int srcPos = 0; srcPos < srcs.length; srcPos++) {
- srcs[srcPos] = QB.getAppendedAliasFromId(joinCtx.getId(), srcs[srcPos]);
+ // Make a deep copy of the aliases so that they are not changed in the context
+ String[] joinSrcs = joinCtx.getBaseSrc();
+ String[] srcs = new String[joinSrcs.length];
+ for (int srcPos = 0; srcPos < joinSrcs.length; srcPos++) {
+ joinSrcs[srcPos] = QB.getAppendedAliasFromId(joinCtx.getId(), joinSrcs[srcPos]);
+ srcs[srcPos] = new String(joinSrcs[srcPos]);
}
// Given a candidate map-join, can this join be converted.
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java Fri Apr 26 19:14:49 2013
@@ -57,6 +57,9 @@ public class AvgPartitionSizeBasedBigTab
getListTopOps(joinOp, topOps);
int currentPos = 0;
for (TableScanOperator topOp : topOps) {
+ if (topOp == null) {
+ return -1;
+ }
int numPartitions = 1; // in case the sizes match, preference is
// given to the table with fewer partitions
Table table = parseCtx.getTopToTable().get(topOp);
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Fri Apr 26 19:14:49 2013
@@ -249,6 +249,7 @@ public class BucketingSortingReduceSinkO
fsOp.getConf().setMultiFileSpray(false);
fsOp.getConf().setTotalFiles(1);
fsOp.getConf().setNumFiles(1);
+ fsOp.getConf().setRemovedReduceSinkBucketSort(true);
tsOp.setUseBucketizedHiveInputFormat(true);
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Fri Apr 26 19:14:49 2013
@@ -204,7 +204,9 @@ public class GenMRFileSink1 implements N
(MapredWork) currTask.getWork(), false, ctx);
}
- if (!rootTasks.contains(currTask)) {
+ if (!rootTasks.contains(currTask)
+ && (currTask.getParentTasks() == null
+ || currTask.getParentTasks().isEmpty())) {
rootTasks.add(currTask);
}
}
@@ -721,7 +723,9 @@ public class GenMRFileSink1 implements N
(MapredWork) currTask.getWork(), false, ctx);
}
opTaskMap.put(null, currTask);
- if (!rootTasks.contains(currTask)) {
+ if (!rootTasks.contains(currTask)
+ && (currTask.getParentTasks() == null
+ || currTask.getParentTasks().isEmpty())) {
rootTasks.add(currTask);
}
} else {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Fri Apr 26 19:14:49 2013
@@ -259,12 +259,17 @@ public class GenMRUnion1 implements Node
// Copy into the current union task plan if
if (uPrsCtx.getMapOnlySubq(pos) && uPrsCtx.getRootTask(pos)) {
processSubQueryUnionMerge(ctx, uCtxTask, union, stack);
+ if (ctx.getRootTasks().contains(currTask)) {
+ ctx.getRootTasks().remove(currTask);
+ }
}
// If it a map-reduce job, create a temporary file
else {
// is the current task a root task
if (shouldBeRootTask(currTask)
- && (!ctx.getRootTasks().contains(currTask))) {
+ && !ctx.getRootTasks().contains(currTask)
+ && (currTask.getParentTasks() == null
+ || currTask.getParentTasks().isEmpty())) {
ctx.getRootTasks().add(currTask);
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Fri Apr 26 19:14:49 2013
@@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.ExecDriver;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -106,7 +108,9 @@ public final class GenMapRedUtils {
List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
- if (!rootTasks.contains(currTask)) {
+ if (!rootTasks.contains(currTask)
+ && (currTask.getParentTasks() == null
+ || currTask.getParentTasks().isEmpty())) {
rootTasks.add(currTask);
}
if (reducer.getClass() == JoinOperator.class) {
@@ -759,6 +763,41 @@ public final class GenMapRedUtils {
}
/**
+ * Set the key and value description for all the tasks rooted at the given
+ * task. Loops over all the tasks recursively.
+ *
+ * @param task
+ */
+ public static void setKeyAndValueDescForTaskTree(Task<? extends Serializable> task) {
+
+ if (task instanceof ConditionalTask) {
+ List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task)
+ .getListTasks();
+ for (Task<? extends Serializable> tsk : listTasks) {
+ setKeyAndValueDescForTaskTree(tsk);
+ }
+ } else if (task instanceof ExecDriver) {
+ MapredWork work = (MapredWork) task.getWork();
+ work.deriveExplainAttributes();
+ HashMap<String, Operator<? extends OperatorDesc>> opMap = work
+ .getAliasToWork();
+ if (opMap != null && !opMap.isEmpty()) {
+ for (Operator<? extends OperatorDesc> op : opMap.values()) {
+ setKeyAndValueDesc(work, op);
+ }
+ }
+ }
+
+ if (task.getChildTasks() == null) {
+ return;
+ }
+
+ for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+ setKeyAndValueDescForTaskTree(childTask);
+ }
+ }
+
+ /**
* create a new plan and return.
*
* @return the new plan
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Fri Apr 26 19:14:49 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
import org.apache.hadoop.util.StringUtils;
/**
@@ -107,7 +109,7 @@ public class GroupByOptimizer implements
GraphWalker ogw = new DefaultGraphWalker(disp);
// Create a list of topop nodes
- ArrayList<Node> topNodes = new ArrayList<Node>();
+ List<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(pctx.getTopOps().values());
ogw.startWalking(topNodes, null);
@@ -174,15 +176,83 @@ public class GroupByOptimizer implements
GroupByOptimizerSortMatch match = checkSortGroupBy(stack, groupByOp);
boolean useMapperSort =
HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT);
+ GroupByDesc groupByOpDesc = groupByOp.getConf();
+
+ boolean removeReduceSink = false;
+ boolean optimizeDistincts = false;
+ boolean setBucketGroup = false;
// Dont remove the operator for distincts
- if (useMapperSort && !groupByOp.getConf().isDistinct() &&
+ if (useMapperSort &&
(match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
- convertGroupByMapSideSortedGroupBy(hiveConf, groupByOp, depth);
+ if (!groupByOpDesc.isDistinct()) {
+ removeReduceSink = true;
+ }
+ else if (!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
+ // Optimize the query: select count(distinct keys) from T, where
+ // T is bucketized and sorted by T
+ // Partial aggregation can be done by the mappers in this scenario
+
+ List<ExprNodeDesc> keys =
+ ((GroupByOperator)
+ (groupByOp.getChildOperators().get(0).getChildOperators().get(0)))
+ .getConf().getKeys();
+ if ((keys == null) || (keys.isEmpty())) {
+ optimizeDistincts = true;
+ }
+ }
}
- else if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) ||
+
+ if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) ||
(match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
- groupByOp.getConf().setBucketGroup(true);
+ setBucketGroup = true;
+ }
+
+ if (removeReduceSink) {
+ convertGroupByMapSideSortedGroupBy(hiveConf, groupByOp, depth);
+ }
+ else if (optimizeDistincts) {
+ // In test mode, dont change the query plan. However, setup a query property
+ pGraphContext.getQueryProperties().setHasMapGroupBy(true);
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT_TESTMODE)) {
+ return;
+ }
+ ReduceSinkOperator reduceSinkOp =
+ (ReduceSinkOperator)groupByOp.getChildOperators().get(0);
+ GroupByDesc childGroupByDesc =
+ ((GroupByOperator)
+ (reduceSinkOp.getChildOperators().get(0))).getConf();
+
+ for (int pos = 0; pos < childGroupByDesc.getAggregators().size(); pos++) {
+ AggregationDesc aggr = childGroupByDesc.getAggregators().get(pos);
+ // Partial aggregation is not done for distincts on the mapper
+ // However, if the data is bucketed/sorted on the distinct key, partial aggregation
+ // can be performed on the mapper.
+ if (aggr.getDistinct()) {
+ ArrayList<ExprNodeDesc> parameters = new ArrayList<ExprNodeDesc>();
+ ExprNodeDesc param = aggr.getParameters().get(0);
+ assert param instanceof ExprNodeColumnDesc;
+ ExprNodeColumnDesc paramC = (ExprNodeColumnDesc) param;
+ paramC.setIsPartitionColOrVirtualCol(false);
+ paramC.setColumn("VALUE._col" + pos);
+ parameters.add(paramC);
+ aggr.setParameters(parameters);
+ aggr.setDistinct(false);
+ aggr.setMode(Mode.FINAL);
+ }
+ }
+ // Partial aggregation is performed on the mapper, no distinct processing at the reducer
+ childGroupByDesc.setDistinct(false);
+ groupByOpDesc.setDontResetAggrsDistinct(true);
+ groupByOpDesc.setBucketGroup(true);
+ groupByOp.setUseBucketizedHiveInputFormat(true);
+ // no distinct processing at the reducer
+ // A query like 'select count(distinct key) from T' is transformed into
+ // 'select count(key) from T' as far as the reducer is concerned.
+ reduceSinkOp.getConf().setDistinctColumnIndices(new ArrayList<List<Integer>>());
+ }
+ else if (setBucketGroup) {
+ groupByOpDesc.setBucketGroup(true);
}
}
@@ -339,8 +409,8 @@ public class GroupByOptimizer implements
GroupByOptimizerSortMatch currentMatch =
notDeniedPartns.isEmpty() ? GroupByOptimizerSortMatch.NO_MATCH :
- notDeniedPartns.size() > 1 ? GroupByOptimizerSortMatch.PARTIAL_MATCH :
- GroupByOptimizerSortMatch.COMPLETE_MATCH;
+ notDeniedPartns.size() > 1 ? GroupByOptimizerSortMatch.PARTIAL_MATCH :
+ GroupByOptimizerSortMatch.COMPLETE_MATCH;
for (Partition part : notDeniedPartns) {
List<String> sortCols = part.getSortColNames();
List<String> bucketCols = part.getBucketCols();
@@ -440,8 +510,9 @@ public class GroupByOptimizer implements
case NO_MATCH:
return GroupByOptimizerSortMatch.NO_MATCH;
case COMPLETE_MATCH:
- return ((bucketCols != null) && !bucketCols.isEmpty() && sortCols.containsAll(bucketCols)) ?
- GroupByOptimizerSortMatch.COMPLETE_MATCH : GroupByOptimizerSortMatch.PARTIAL_MATCH;
+ return ((bucketCols != null) && !bucketCols.isEmpty() &&
+ sortCols.containsAll(bucketCols)) ?
+ GroupByOptimizerSortMatch.COMPLETE_MATCH : GroupByOptimizerSortMatch.PARTIAL_MATCH;
case PREFIX_COL1_MATCH:
return GroupByOptimizerSortMatch.NO_MATCH;
case PREFIX_COL2_MATCH:
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Fri Apr 26 19:14:49 2013
@@ -163,8 +163,11 @@ public final class MapJoinFactory {
opTaskMap.put(op, currTask);
List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
- assert (!rootTasks.contains(currTask));
- rootTasks.add(currTask);
+ if(!rootTasks.contains(currTask)
+ && (currTask.getParentTasks() == null
+ || currTask.getParentTasks().isEmpty())) {
+ rootTasks.add(currTask);
+ }
assert currTopOp != null;
opProcCtx.getSeenOps().add(currTopOp);
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Fri Apr 26 19:14:49 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.ScriptOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
@@ -74,6 +75,7 @@ import org.apache.hadoop.hive.ql.plan.Op
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde.serdeConstants;
@@ -88,6 +90,10 @@ import org.apache.hadoop.hive.serde2.typ
public class MapJoinProcessor implements Transform {
private static final Log LOG = LogFactory.getLog(MapJoinProcessor.class.getName());
+ // mapjoin table descriptor contains a key descriptor which needs the field schema
+ // (column type + column name). The column name is not really used anywhere, but it
+ // needs to be passed. Use the string defined below for that.
+ private static final String MAPJOINKEY_FIELDPREFIX = "mapjoinkey";
private ParseContext pGraphContext;
@@ -107,9 +113,11 @@ public class MapJoinProcessor implements
}
/**
- * Generate the MapRed Local Work
+ * Generate the MapRed Local Work for the given map-join operator
+ *
* @param newWork
* @param mapJoinOp
+ * map-join operator for which local work needs to be generated.
* @param bigTablePos
* @return
* @throws SemanticException
@@ -219,15 +227,31 @@ public class MapJoinProcessor implements
return bigTableAlias;
}
+ /**
+ * Convert the join to a map-join and also generate any local work needed.
+ *
+ * @param newWork MapredWork in which the conversion is to happen
+ * @param op
+ * The join operator that needs to be converted to map-join
+ * @param bigTablePos
+ * @return the alias to the big table
+ * @throws SemanticException
+ */
public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos)
- throws SemanticException {
- try {
- LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
+ throws SemanticException {
+ LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
newWork.getOpParseCtxMap();
- QBJoinTree newJoinTree = newWork.getJoinTree();
- // generate the map join operator; already checked the map join
- MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
- newJoinTree, mapJoinPos, true, false);
+ QBJoinTree newJoinTree = newWork.getJoinTree();
+ // generate the map join operator; already checked the map join
+ MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
+ newJoinTree, mapJoinPos, true, false);
+ return genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos);
+ }
+
+ public static String genLocalWorkForMapJoin(MapredWork newWork, MapJoinOperator newMapJoinOp,
+ int mapJoinPos)
+ throws SemanticException {
+ try {
// generate the local work and return the big table alias
String bigTableAlias = MapJoinProcessor
.genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos);
@@ -422,7 +446,7 @@ public class MapJoinProcessor implements
}
TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils
- .getFieldSchemasFromColumnList(keyCols, "mapjoinkey"));
+ .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
List<TableDesc> valueFiltedTableDescs = new ArrayList<TableDesc>();
@@ -501,6 +525,65 @@ public class MapJoinProcessor implements
return mapJoinOp;
}
+ /**
+ * convert a sortmerge join to a a map-side join.
+ *
+ * @param opParseCtxMap
+ * @param smbJoinOp
+ * join operator
+ * @param joinTree
+ * qb join tree
+ * @param bigTablePos
+ * position of the source to be read as part of map-reduce framework. All other sources
+ * are cached in memory
+ * @param noCheckOuterJoin
+ */
+ public static MapJoinOperator convertSMBJoinToMapJoin(
+ Map<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
+ SMBMapJoinOperator smbJoinOp, QBJoinTree joinTree, int bigTablePos, boolean noCheckOuterJoin)
+ throws SemanticException {
+ // Create a new map join operator
+ SMBJoinDesc smbJoinDesc = smbJoinOp.getConf();
+ List<ExprNodeDesc> keyCols = smbJoinDesc.getKeys().get(Byte.valueOf((byte) 0));
+ TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils
+ .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
+ MapJoinDesc mapJoinDesc = new MapJoinDesc(smbJoinDesc.getKeys(),
+ keyTableDesc, smbJoinDesc.getExprs(),
+ smbJoinDesc.getValueTblDescs(), smbJoinDesc.getValueTblDescs(),
+ smbJoinDesc.getOutputColumnNames(),
+ bigTablePos, smbJoinDesc.getConds(),
+ smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix());
+
+ RowResolver joinRS = opParseCtxMap.get(smbJoinOp).getRowResolver();
+ // The mapjoin has the same schema as the join operator
+ MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(
+ mapJoinDesc, joinRS.getRowSchema(),
+ new ArrayList<Operator<? extends OperatorDesc>>());
+
+ OpParseContext ctx = new OpParseContext(joinRS);
+ opParseCtxMap.put(mapJoinOp, ctx);
+
+ // change the children of the original join operator to point to the map
+ // join operator
+ List<Operator<? extends OperatorDesc>> childOps = smbJoinOp.getChildOperators();
+ for (Operator<? extends OperatorDesc> childOp : childOps) {
+ childOp.replaceParent(smbJoinOp, mapJoinOp);
+ }
+ mapJoinOp.setChildOperators(childOps);
+ smbJoinOp.setChildOperators(null);
+
+ // change the parent of the original SMBjoin operator to point to the map
+ // join operator
+ List<Operator<? extends OperatorDesc>> parentOps = smbJoinOp.getParentOperators();
+ for (Operator<? extends OperatorDesc> parentOp : parentOps) {
+ parentOp.replaceChild(smbJoinOp, mapJoinOp);
+ }
+ mapJoinOp.setParentOperators(parentOps);
+ smbJoinOp.setParentOperators(null);
+
+ return mapJoinOp;
+ }
+
public MapJoinOperator generateMapJoinOperator(ParseContext pctx, JoinOperator op,
QBJoinTree joinTree, int mapJoinPos) throws SemanticException {
HiveConf hiveConf = pctx.getConf();
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java Fri Apr 26 19:14:49 2013
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -43,9 +44,10 @@ public abstract class SizeBasedBigTableS
for (Operator<? extends OperatorDesc> parentOp : op.getParentOperators()) {
if (parentOp instanceof TableScanOperator) {
- topOps.add((TableScanOperator)parentOp);
- }
- else {
+ topOps.add((TableScanOperator) parentOp);
+ } else if (parentOp instanceof CommonJoinOperator) {
+ topOps.add(null);
+ } else {
getListTopOps(parentOp, topOps);
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java Fri Apr 26 19:14:49 2013
@@ -49,6 +49,9 @@ implements BigTableSelectorForAutoSMJ {
getListTopOps(joinOp, topOps);
int currentPos = 0;
for (TableScanOperator topOp : topOps) {
+ if (topOp == null) {
+ return -1;
+ }
Table table = parseCtx.getTopToTable().get(topOp);
long currentSize = 0;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java Fri Apr 26 19:14:49 2013
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.optimizer.PrunerUtils;
@@ -290,12 +291,13 @@ public class ListBucketingPruner impleme
List<List<String>> uniqSkewedValues) throws SemanticException {
// For each entry in dynamic-multi-dimension collection.
List<String> skewedCols = part.getSkewedColNames(); // Retrieve skewed column.
- Map<List<String>, String> mappings = part.getSkewedColValueLocationMaps(); // Retrieve skewed
+ Map<SkewedValueList, String> mappings = part.getSkewedColValueLocationMaps(); // Retrieve skewed
// map.
assert ListBucketingPrunerUtils.isListBucketingPart(part) : part.getName()
+ " skewed metadata is corrupted. No skewed column and/or location mappings information.";
List<List<String>> skewedValues = part.getSkewedColValues();
List<Boolean> nonSkewedValueMatchResult = new ArrayList<Boolean>();
+ SkewedValueList skewedValueList = new SkewedValueList();
for (List<String> cell : collections) {
// Walk through the tree to decide value.
// Example: skewed column: C1, C2 ;
@@ -309,8 +311,9 @@ public class ListBucketingPruner impleme
/* It's valid case if a partition: */
/* 1. is defined with skewed columns and skewed values in metadata */
/* 2. doesn't have all skewed values within its data */
- if (mappings.get(cell) != null) {
- selectedPaths.add(new Path(mappings.get(cell)));
+ skewedValueList.setSkewedValueList(cell);
+ if (mappings.get(skewedValueList) != null) {
+ selectedPaths.add(new Path(mappings.get(skewedValueList)));
}
}
} else {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java Fri Apr 26 19:14:49 2013
@@ -17,46 +17,13 @@
*/
package org.apache.hadoop.hive.ql.optimizer.physical;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.hive.common.ObjectPair;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapRedTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
-import org.apache.hadoop.hive.ql.lib.TaskGraphWalker.TaskGraphWalkerContext;
-import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
-import org.apache.hadoop.hive.ql.plan.ConditionalWork;
-import org.apache.hadoop.hive.ql.plan.JoinDesc;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
/*
* Convert tasks involving JOIN into MAPJOIN.
@@ -105,499 +72,11 @@ public class CommonJoinResolver implemen
TaskGraphWalker ogw = new TaskGraphWalker(disp);
// get all the tasks nodes from root task
- ArrayList<Node> topNodes = new ArrayList<Node>();
+ List<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(pctx.rootTasks);
// begin to walk through the task tree.
ogw.startWalking(topNodes, null);
return pctx;
}
-
- /**
- * Iterator each tasks. If this task has a local work,create a new task for this local work, named
- * MapredLocalTask. then make this new generated task depends on current task's parent task, and
- * make current task depends on this new generated task
- */
- class CommonJoinTaskDispatcher implements Dispatcher {
-
- private final PhysicalContext physicalContext;
-
- public CommonJoinTaskDispatcher(PhysicalContext context) {
- super();
- physicalContext = context;
- }
-
- // Get the position of the big table for this join operator and the given alias
- private int getPosition(MapredWork work, Operator<? extends OperatorDesc> joinOp,
- String alias) {
- Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
-
- // reduceSinkOperator's child is null, but joinOperator's parents is reduceSink
- while ((parentOp.getChildOperators() != null) &&
- (!parentOp.getChildOperators().isEmpty())) {
- parentOp = parentOp.getChildOperators().get(0);
- }
-
- return joinOp.getParentOperators().indexOf(parentOp);
- }
-
- /*
- * A task and its child task has been converted from join to mapjoin.
- * See if the two tasks can be merged.
- */
- private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task) {
- MapRedTask childTask = (MapRedTask)task.getChildTasks().get(0);
- MapredWork work = task.getWork();
- MapredLocalWork localWork = work.getMapLocalWork();
- MapredWork childWork = childTask.getWork();
- MapredLocalWork childLocalWork = childWork.getMapLocalWork();
-
- // Can this be merged
- Map<String, Operator<? extends OperatorDesc>> aliasToWork = work.getAliasToWork();
- if (aliasToWork.size() > 1) {
- return;
- }
-
- Operator<? extends OperatorDesc> op = aliasToWork.values().iterator().next();
- while (op.getChildOperators() != null) {
- // Dont perform this optimization for multi-table inserts
- if (op.getChildOperators().size() > 1) {
- return;
- }
- op = op.getChildOperators().get(0);
- }
-
- if (!(op instanceof FileSinkOperator)) {
- return;
- }
-
- FileSinkOperator fop = (FileSinkOperator)op;
- String workDir = fop.getConf().getDirName();
-
- Map<String, ArrayList<String>> childPathToAliases = childWork.getPathToAliases();
- if (childPathToAliases.size() > 1) {
- return;
- }
-
- // The filesink writes to a different directory
- if (!childPathToAliases.keySet().iterator().next().equals(workDir)) {
- return;
- }
-
- // Either of them should not be bucketed
- if ((localWork.getBucketMapjoinContext() != null) ||
- (childLocalWork.getBucketMapjoinContext() != null)) {
- return;
- }
-
- // Merge the trees
- if (childWork.getAliasToWork().size() > 1) {
- return;
- }
-
- Operator<? extends Serializable> childAliasOp =
- childWork.getAliasToWork().values().iterator().next();
- if (fop.getParentOperators().size() > 1) {
- return;
- }
-
- // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
- // top of the second
- Operator<? extends Serializable> parentFOp = fop.getParentOperators().get(0);
- parentFOp.getChildOperators().remove(fop);
- parentFOp.getChildOperators().add(childAliasOp);
- List<Operator<? extends OperatorDesc>> parentOps =
- new ArrayList<Operator<? extends OperatorDesc>>();
- parentOps.add(parentFOp);
- childAliasOp.setParentOperators(parentOps);
-
- work.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo());
- for (Map.Entry<String, PartitionDesc> childWorkEntry :
- childWork.getPathToPartitionInfo().entrySet()) {
- if (childWork.getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
- work.getPathToPartitionInfo().put(childWorkEntry.getKey(), childWorkEntry.getValue());
- }
- }
-
- localWork.getAliasToFetchWork().putAll(childLocalWork.getAliasToFetchWork());
- localWork.getAliasToWork().putAll(childLocalWork.getAliasToWork());
-
- // remove the child task
- List<Task<? extends Serializable>> oldChildTasks = childTask.getChildTasks();
- task.setChildTasks(oldChildTasks);
- if (oldChildTasks != null) {
- for (Task<? extends Serializable> oldChildTask : oldChildTasks) {
- oldChildTask.getParentTasks().remove(childTask);
- oldChildTask.getParentTasks().add(task);
- }
- }
- }
-
- // create map join task and set big table as bigTablePosition
- private ObjectPair<MapRedTask, String> convertTaskToMapJoinTask(MapredWork newWork,
- int bigTablePosition) throws SemanticException {
- // create a mapred task for this work
- MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext
- .getParseContext().getConf());
- JoinOperator newJoinOp = getJoinOp(newTask);
-
- // optimize this newWork and assume big table position is i
- String bigTableAlias =
- MapJoinProcessor.genMapJoinOpAndLocalWork(newWork, newJoinOp, bigTablePosition);
- return new ObjectPair<MapRedTask, String>(newTask, bigTableAlias);
- }
-
- private Task<? extends Serializable> processCurrentTask(MapRedTask currTask,
- ConditionalTask conditionalTask, Context context)
- throws SemanticException {
-
- // whether it contains common join op; if contains, return this common join op
- JoinOperator joinOp = getJoinOp(currTask);
- if (joinOp == null || joinOp.getConf().isFixedAsSorted()) {
- return null;
- }
- currTask.setTaskTag(Task.COMMON_JOIN);
-
- MapredWork currWork = currTask.getWork();
-
- // create conditional work list and task list
- List<Serializable> listWorks = new ArrayList<Serializable>();
- List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
-
- // create alias to task mapping and alias to input file mapping for resolver
- HashMap<String, Task<? extends Serializable>> aliasToTask = new HashMap<String, Task<? extends Serializable>>();
- HashMap<String, ArrayList<String>> pathToAliases = currWork.getPathToAliases();
- Map<String, Operator<? extends OperatorDesc>> aliasToWork = currWork.getAliasToWork();
-
- // get parseCtx for this Join Operator
- ParseContext parseCtx = physicalContext.getParseContext();
- QBJoinTree joinTree = parseCtx.getJoinContext().get(joinOp);
-
- // start to generate multiple map join tasks
- JoinDesc joinDesc = joinOp.getConf();
- Byte[] order = joinDesc.getTagOrder();
- int numAliases = order.length;
-
- long aliasTotalKnownInputSize = 0;
- HashMap<String, Long> aliasToSize = new HashMap<String, Long>();
- try {
- // go over all the input paths, and calculate a known total size, known
- // size for each input alias.
- Utilities.getInputSummary(context, currWork, null).getLength();
-
- // set alias to size mapping, this can be used to determine if one table
- // is choosen as big table, what's the total size of left tables, which
- // are going to be small tables.
- for (Map.Entry<String, ArrayList<String>> entry : pathToAliases.entrySet()) {
- String path = entry.getKey();
- List<String> aliasList = entry.getValue();
- ContentSummary cs = context.getCS(path);
- if (cs != null) {
- long size = cs.getLength();
- for (String alias : aliasList) {
- aliasTotalKnownInputSize += size;
- Long es = aliasToSize.get(alias);
- if (es == null) {
- es = new Long(0);
- }
- es += size;
- aliasToSize.put(alias, es);
- }
- }
- }
-
- HashSet<Integer> bigTableCandidates = MapJoinProcessor.getBigTableCandidates(joinDesc.getConds());
-
- // no table could be the big table; there is no need to convert
- if (bigTableCandidates == null) {
- return null;
- }
-
- Configuration conf = context.getConf();
-
- // If sizes of atleast n-1 tables in a n-way join is known, and their sum is smaller than
- // the threshold size, convert the join into map-join and don't create a conditional task
- boolean convertJoinMapJoin = HiveConf.getBoolVar(conf,
- HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK);
- int bigTablePosition = -1;
- if (convertJoinMapJoin) {
- // This is the threshold that the user has specified to fit in mapjoin
- long mapJoinSize = HiveConf.getLongVar(conf,
- HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
-
- boolean bigTableFound = false;
- long largestBigTableCandidateSize = 0;
- long sumTableSizes = 0;
- for (String alias : aliasToWork.keySet()) {
- int tablePosition = getPosition(currWork, joinOp, alias);
- boolean bigTableCandidate = bigTableCandidates.contains(tablePosition);
- Long size = aliasToSize.get(alias);
- // The size is not available at compile time if the input is a sub-query.
- // If the size of atleast n-1 inputs for a n-way join are available at compile time,
- // and the sum of them is less than the specified threshold, then convert the join
- // into a map-join without the conditional task.
- if ((size == null) || (size > mapJoinSize)) {
- sumTableSizes += largestBigTableCandidateSize;
- if (bigTableFound || (sumTableSizes > mapJoinSize) || !bigTableCandidate) {
- convertJoinMapJoin = false;
- break;
- }
- bigTableFound = true;
- bigTablePosition = tablePosition;
- largestBigTableCandidateSize = mapJoinSize + 1;
- } else {
- if (bigTableCandidate && size > largestBigTableCandidateSize) {
- bigTablePosition = tablePosition;
- sumTableSizes += largestBigTableCandidateSize;
- largestBigTableCandidateSize = size;
- }
- else {
- sumTableSizes += size;
- }
-
- if (sumTableSizes > mapJoinSize) {
- convertJoinMapJoin = false;
- break;
- }
- }
- }
- }
-
- String bigTableAlias = null;
- currWork.setOpParseCtxMap(parseCtx.getOpParseCtx());
- currWork.setJoinTree(joinTree);
-
- if (convertJoinMapJoin) {
- // create map join task and set big table as bigTablePosition
- MapRedTask newTask = convertTaskToMapJoinTask(currWork, bigTablePosition).getFirst();
-
- newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP);
- replaceTask(currTask, newTask, physicalContext);
-
- // Can this task be merged with the child task. This can happen if a big table is being
- // joined with multiple small tables on different keys
- // Further optimizations are possible here, a join which has been converted to a mapjoin
- // followed by a mapjoin can be performed in a single MR job.
- if ((newTask.getChildTasks() != null) && (newTask.getChildTasks().size() == 1)
- && (newTask.getChildTasks().get(0).getTaskTag() == Task.MAPJOIN_ONLY_NOBACKUP)) {
- mergeMapJoinTaskWithChildMapJoinTask(newTask);
- }
-
- return newTask;
- }
-
- long ThresholdOfSmallTblSizeSum = HiveConf.getLongVar(conf,
- HiveConf.ConfVars.HIVESMALLTABLESFILESIZE);
- String xml = currWork.toXML();
- for (int i = 0; i < numAliases; i++) {
- // this table cannot be big table
- if (!bigTableCandidates.contains(i)) {
- continue;
- }
-
- // deep copy a new mapred work from xml
- InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
- MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
-
- // create map join task and set big table as i
- ObjectPair<MapRedTask, String> newTaskAlias = convertTaskToMapJoinTask(newWork, i);
- MapRedTask newTask = newTaskAlias.getFirst();
- bigTableAlias = newTaskAlias.getSecond();
-
- Long aliasKnownSize = aliasToSize.get(bigTableAlias);
- if (aliasKnownSize != null && aliasKnownSize.longValue() > 0) {
- long smallTblTotalKnownSize = aliasTotalKnownInputSize
- - aliasKnownSize.longValue();
- if(smallTblTotalKnownSize > ThresholdOfSmallTblSizeSum) {
- //this table is not good to be a big table.
- continue;
- }
- }
-
- // add into conditional task
- listWorks.add(newTask.getWork());
- listTasks.add(newTask);
- newTask.setTaskTag(Task.CONVERTED_MAPJOIN);
-
- //set up backup task
- newTask.setBackupTask(currTask);
- newTask.setBackupChildrenTasks(currTask.getChildTasks());
-
- // put the mapping alias to task
- aliasToTask.put(bigTableAlias, newTask);
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw new SemanticException("Generate Map Join Task Error: " + e.getMessage());
- }
-
- // insert current common join task to conditional task
- listWorks.add(currTask.getWork());
- listTasks.add(currTask);
- // clear JoinTree and OP Parse Context
- currWork.setOpParseCtxMap(null);
- currWork.setJoinTree(null);
-
- // create conditional task and insert conditional task into task tree
- ConditionalWork cndWork = new ConditionalWork(listWorks);
- ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, parseCtx.getConf());
- cndTsk.setListTasks(listTasks);
-
- // set resolver and resolver context
- cndTsk.setResolver(new ConditionalResolverCommonJoin());
- ConditionalResolverCommonJoinCtx resolverCtx = new ConditionalResolverCommonJoinCtx();
- resolverCtx.setPathToAliases(pathToAliases);
- resolverCtx.setAliasToKnownSize(aliasToSize);
- resolverCtx.setAliasToTask(aliasToTask);
- resolverCtx.setCommonJoinTask(currTask);
- resolverCtx.setLocalTmpDir(context.getLocalScratchDir(false));
- resolverCtx.setHdfsTmpDir(context.getMRScratchDir());
- cndTsk.setResolverCtx(resolverCtx);
-
- //replace the current task with the new generated conditional task
- this.replaceTaskWithConditionalTask(currTask, cndTsk, physicalContext);
- return cndTsk;
- }
-
- private void replaceTaskWithConditionalTask(
- Task<? extends Serializable> currTask, ConditionalTask cndTsk,
- PhysicalContext physicalContext) {
- // add this task into task tree
- // set all parent tasks
- List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
- currTask.setParentTasks(null);
- if (parentTasks != null) {
- for (Task<? extends Serializable> tsk : parentTasks) {
- // make new generated task depends on all the parent tasks of current task.
- tsk.addDependentTask(cndTsk);
- // remove the current task from its original parent task's dependent task
- tsk.removeDependentTask(currTask);
- }
- } else {
- // remove from current root task and add conditional task to root tasks
- physicalContext.removeFromRootTask(currTask);
- physicalContext.addToRootTask(cndTsk);
- }
- // set all child tasks
- List<Task<? extends Serializable>> oldChildTasks = currTask.getChildTasks();
- if (oldChildTasks != null) {
- for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
- if (tsk.equals(currTask)) {
- continue;
- }
- for (Task<? extends Serializable> oldChild : oldChildTasks) {
- tsk.addDependentTask(oldChild);
- }
- }
- }
- }
-
- // Replace the task with the new task. Copy the children and parents of the old
- // task to the new task.
- private void replaceTask(
- Task<? extends Serializable> currTask, Task<? extends Serializable> newTask,
- PhysicalContext physicalContext) {
- // add this task into task tree
- // set all parent tasks
- List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
- currTask.setParentTasks(null);
- if (parentTasks != null) {
- for (Task<? extends Serializable> tsk : parentTasks) {
- // remove the current task from its original parent task's dependent task
- tsk.removeDependentTask(currTask);
- // make new generated task depends on all the parent tasks of current task.
- tsk.addDependentTask(newTask);
- }
- } else {
- // remove from current root task and add conditional task to root tasks
- physicalContext.removeFromRootTask(currTask);
- physicalContext.addToRootTask(newTask);
- }
-
- // set all child tasks
- List<Task<? extends Serializable>> oldChildTasks = currTask.getChildTasks();
- currTask.setChildTasks(null);
- if (oldChildTasks != null) {
- for (Task<? extends Serializable> tsk : oldChildTasks) {
- // remove the current task from its original parent task's dependent task
- tsk.getParentTasks().remove(currTask);
- // make new generated task depends on all the parent tasks of current task.
- newTask.addDependentTask(tsk);
- }
- }
- }
-
- @Override
- public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
- throws SemanticException {
- if (nodeOutputs == null || nodeOutputs.length == 0) {
- throw new SemanticException("No Dispatch Context");
- }
-
- TaskGraphWalkerContext walkerCtx = (TaskGraphWalkerContext) nodeOutputs[0];
-
- Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
- // not map reduce task or not conditional task, just skip
- if (currTask.isMapRedTask()) {
- if (currTask instanceof ConditionalTask) {
- // get the list of task
- List<Task<? extends Serializable>> taskList = ((ConditionalTask) currTask).getListTasks();
- for (Task<? extends Serializable> tsk : taskList) {
- if (tsk.isMapRedTask()) {
- Task<? extends Serializable> newTask = this.processCurrentTask((MapRedTask) tsk,
- ((ConditionalTask) currTask), physicalContext.getContext());
- walkerCtx.addToDispatchList(newTask);
- }
- }
- } else {
- Task<? extends Serializable> newTask =
- this.processCurrentTask((MapRedTask) currTask, null, physicalContext.getContext());
- walkerCtx.addToDispatchList(newTask);
- }
- }
- return null;
- }
-
- /*
- * If any operator which does not allow map-side conversion is present in the mapper, dont
- * convert it into a conditional task.
- */
- private boolean checkOperatorOKMapJoinConversion(Operator<? extends OperatorDesc> op) {
- if (!op.opAllowedConvertMapJoin()) {
- return false;
- }
-
- if (op.getChildOperators() == null) {
- return true;
- }
-
- for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
- if (!checkOperatorOKMapJoinConversion(childOp)) {
- return false;
- }
- }
-
- return true;
- }
-
- private JoinOperator getJoinOp(MapRedTask task) throws SemanticException {
- MapredWork work = task.getWork();
- if (work == null) {
- return null;
- }
- Operator<? extends OperatorDesc> reducerOp = work.getReducer();
- if (reducerOp instanceof JoinOperator) {
- /* Is any operator present, which prevents the conversion */
- Map<String, Operator<? extends OperatorDesc>> aliasToWork = work.getAliasToWork();
- for (Operator<? extends OperatorDesc> op : aliasToWork.values()) {
- if (!checkOperatorOKMapJoinConversion(op)) {
- return null;
- }
- }
- return (JoinOperator) reducerOp;
- } else {
- return null;
- }
- }
- }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java Fri Apr 26 19:14:49 2013
@@ -51,7 +51,15 @@ public class PhysicalOptimizer {
}
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
resolvers.add(new CommonJoinResolver());
+
+ // The joins have been automatically converted to map-joins.
+ // However, if the joins were converted to sort-merge joins automatically,
+ // they should also be tried as map-joins.
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_TOMAPJOIN)) {
+ resolvers.add(new SortMergeJoinResolver());
+ }
}
+
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER)) {
resolvers.add(new IndexWhereResolver());
}
@@ -61,7 +69,7 @@ public class PhysicalOptimizer {
}
// Physical optimizers which follow this need to be careful not to invalidate the inferences
- // made by this optimizer. Only optimizers which depend on the results of this one should
+ // made by this optimizer. Only optimizers which depend on the results of this one should
// follow it.
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT)) {
resolvers.add(new BucketingSortingInferenceOptimizer());
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcContext.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcContext.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcContext.java Fri Apr 26 19:14:49 2013
@@ -40,7 +40,7 @@ public class UnionProcContext implements
private final transient boolean[] mapOnlySubqSet;
private final transient boolean[] rootTask;
- private transient int numInputs;
+ private final transient int numInputs;
public UnionParseContext(int numInputs) {
this.numInputs = numInputs;
@@ -70,27 +70,22 @@ public class UnionProcContext implements
return numInputs;
}
- public void setNumInputs(int numInputs) {
- this.numInputs = numInputs;
- }
-
public boolean allMapOnlySubQ() {
- if (mapOnlySubq != null) {
- for (boolean mapOnly : mapOnlySubq) {
- if (!mapOnly) {
- return false;
- }
- }
- }
- return true;
+ return isAllTrue(mapOnlySubq);
}
public boolean allMapOnlySubQSet() {
- if (mapOnlySubqSet != null) {
- for (boolean mapOnlySet : mapOnlySubqSet) {
- if (!mapOnlySet) {
- return false;
- }
+ return isAllTrue(mapOnlySubqSet);
+ }
+
+ public boolean allRootTasks() {
+ return isAllTrue(rootTask);
+ }
+
+ public boolean isAllTrue(boolean[] array) {
+ for (boolean value : array) {
+ if (!value) {
+ return false;
}
}
return true;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java Fri Apr 26 19:14:49 2013
@@ -138,20 +138,21 @@ public final class UnionProcFactory {
}
start--;
}
+ assert parentUnionOperator != null;
// default to false
boolean mapOnly = false;
- if (parentUnionOperator != null) {
- UnionParseContext parentUCtx =
+ boolean rootTask = false;
+ UnionParseContext parentUCtx =
ctx.getUnionParseContext(parentUnionOperator);
- if (parentUCtx != null && parentUCtx.allMapOnlySubQSet()) {
- mapOnly = parentUCtx.allMapOnlySubQ();
- }
+ if (parentUCtx != null && parentUCtx.allMapOnlySubQSet()) {
+ mapOnly = parentUCtx.allMapOnlySubQ();
+ rootTask = parentUCtx.allRootTasks();
}
uCtx.setMapOnlySubq(pos, mapOnly);
- uCtx.setRootTask(pos, false);
+ uCtx.setRootTask(pos, rootTask);
ctx.setUnionParseContext(union, uCtx);
return null;
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Fri Apr 26 19:14:49 2013
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryProperties;
@@ -965,7 +966,7 @@ public abstract class BaseSemanticAnalyz
* @return
*/
protected ListBucketingCtx constructListBucketingCtx(List<String> skewedColNames,
- List<List<String>> skewedValues, Map<List<String>, String> skewedColValueLocationMaps,
+ List<List<String>> skewedValues, Map<SkewedValueList, String> skewedColValueLocationMaps,
boolean isStoredAsSubDirectories, HiveConf conf) {
ListBucketingCtx lbCtx = new ListBucketingCtx();
lbCtx.setSkewedColNames(skewedColNames);