You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/06 05:44:26 UTC
svn commit: r1629562 [6/38] - in /hive/branches/spark: ./ accumulo-handler/
beeline/ beeline/src/java/org/apache/hive/beeline/ bin/ext/ common/
common/src/java/org/apache/hadoop/hive/conf/
common/src/test/org/apache/hadoop/hive/common/type/ contrib/src...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java Mon Oct 6 03:44:13 2014
@@ -16,12 +16,19 @@ package org.apache.hadoop.hive.ql.io.par
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
@@ -145,6 +152,32 @@ public enum ETypeConverter {
}
};
}
+ },
+ ECHAR_CONVERTER(HiveCharWritable.class) {
+ @Override
+ Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) {
+ return new BinaryConverter<HiveCharWritable>(type, parent, index) {
+ @Override
+ protected HiveCharWritable convert(Binary binary) {
+ HiveChar hiveChar = new HiveChar();
+ hiveChar.setValue(binary.toStringUsingUTF8());
+ return new HiveCharWritable(hiveChar);
+ }
+ };
+ }
+ },
+ EVARCHAR_CONVERTER(HiveVarcharWritable.class) {
+ @Override
+ Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) {
+ return new BinaryConverter<HiveVarcharWritable>(type, parent, index) {
+ @Override
+ protected HiveVarcharWritable convert(Binary binary) {
+ HiveVarchar hiveVarchar = new HiveVarchar();
+ hiveVarchar.setValue(binary.toStringUsingUTF8());
+ return new HiveVarcharWritable(hiveVarchar);
+ }
+ };
+ }
};
final Class<?> _type;
@@ -160,7 +193,7 @@ public enum ETypeConverter {
abstract Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent);
public static Converter getNewConverter(final PrimitiveType type, final int index,
- final HiveGroupConverter parent) {
+ final HiveGroupConverter parent, List<TypeInfo> hiveSchemaTypeInfos) {
if (type.isPrimitive() && (type.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.INT96))) {
//TODO- cleanup once parquet support Timestamp type annotation.
return ETypeConverter.ETIMESTAMP_CONVERTER.getConverter(type, index, parent);
@@ -168,7 +201,15 @@ public enum ETypeConverter {
if (OriginalType.DECIMAL == type.getOriginalType()) {
return EDECIMAL_CONVERTER.getConverter(type, index, parent);
} else if (OriginalType.UTF8 == type.getOriginalType()) {
- return ESTRING_CONVERTER.getConverter(type, index, parent);
+ if (hiveSchemaTypeInfos.get(index).getTypeName()
+ .startsWith(serdeConstants.CHAR_TYPE_NAME)) {
+ return ECHAR_CONVERTER.getConverter(type, index, parent);
+ } else if (hiveSchemaTypeInfos.get(index).getTypeName()
+ .startsWith(serdeConstants.VARCHAR_TYPE_NAME)) {
+ return EVARCHAR_CONVERTER.getConverter(type, index, parent);
+ } else if (type.isPrimitive()) {
+ return ESTRING_CONVERTER.getConverter(type, index, parent);
+ }
}
Class<?> javaType = type.getPrimitiveTypeName().javaType;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java Mon Oct 6 03:44:13 2014
@@ -13,6 +13,9 @@
*/
package org.apache.hadoop.hive.ql.io.parquet.convert;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Writable;
import parquet.io.api.Converter;
@@ -23,17 +26,20 @@ import parquet.schema.Type.Repetition;
public abstract class HiveGroupConverter extends GroupConverter {
protected static Converter getConverterFromDescription(final Type type, final int index,
- final HiveGroupConverter parent) {
+ final HiveGroupConverter parent, List<TypeInfo> hiveSchemaTypeInfos) {
if (type == null) {
return null;
}
if (type.isPrimitive()) {
- return ETypeConverter.getNewConverter(type.asPrimitiveType(), index, parent);
+ return ETypeConverter.getNewConverter(type.asPrimitiveType(), index, parent,
+ hiveSchemaTypeInfos);
} else {
if (type.asGroupType().getRepetition() == Repetition.REPEATED) {
- return new ArrayWritableGroupConverter(type.asGroupType(), parent, index);
+ return new ArrayWritableGroupConverter(type.asGroupType(), parent, index,
+ hiveSchemaTypeInfos);
} else {
- return new DataWritableGroupConverter(type.asGroupType(), parent, index);
+ return new DataWritableGroupConverter(type.asGroupType(), parent, index,
+ hiveSchemaTypeInfos);
}
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java Mon Oct 6 03:44:13 2014
@@ -14,6 +14,7 @@
package org.apache.hadoop.hive.ql.io.parquet.read;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -23,6 +24,8 @@ import org.apache.hadoop.hive.ql.io.IOCo
import org.apache.hadoop.hive.ql.io.parquet.convert.DataWritableRecordConverter;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.util.StringUtils;
@@ -53,7 +56,7 @@ public class DataWritableReadSupport ext
* From a string which columns names (including hive column), return a list
* of string columns
*
- * @param columns comma separated list of columns
+ * @param comma separated list of columns
* @return list with virtual columns removed
*/
private static List<String> getColumns(final String columns) {
@@ -61,6 +64,27 @@ public class DataWritableReadSupport ext
removeVirtualColumns(StringUtils.getStringCollection(columns));
}
+ private static List<TypeInfo> getColumnTypes(Configuration configuration) {
+
+ List<String> columnNames;
+ String columnNamesProperty = configuration.get(IOConstants.COLUMNS);
+ if (columnNamesProperty.length() == 0) {
+ columnNames = new ArrayList<String>();
+ } else {
+ columnNames = Arrays.asList(columnNamesProperty.split(","));
+ }
+ List<TypeInfo> columnTypes;
+ String columnTypesProperty = configuration.get(IOConstants.COLUMNS_TYPES);
+ if (columnTypesProperty.length() == 0) {
+ columnTypes = new ArrayList<TypeInfo>();
+ } else {
+ columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypesProperty);
+ }
+
+ columnTypes = VirtualColumn.removeVirtualColumnTypes(columnNames, columnTypes);
+ return columnTypes;
+ }
+
/**
*
* It creates the readContext for Parquet side with the requested schema during the init phase.
@@ -149,7 +173,8 @@ public class DataWritableReadSupport ext
}
final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser.
parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema, configuration);
- return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema);
+ return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema,
+ getColumnTypes(configuration));
}
/**
@@ -169,4 +194,4 @@ public class DataWritableReadSupport ext
}
return requestedSchema;
}
-}
\ No newline at end of file
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java Mon Oct 6 03:44:13 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.lockmg
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.thrift.TException;
@@ -43,10 +42,10 @@ public class DbLockManager implements Hi
private static final long MAX_SLEEP = 15000;
private HiveLockManagerCtx context;
private Set<DbHiveLock> locks;
- private IMetaStoreClient client;
+ private HiveMetaStoreClient client;
private long nextSleep = 50;
- DbLockManager(IMetaStoreClient client) {
+ DbLockManager(HiveMetaStoreClient client) {
locks = new HashSet<DbHiveLock>();
this.client = client;
}
@@ -211,8 +210,8 @@ public class DbLockManager implements Hi
/**
* Clear the memory of the locks in this object. This won't clear the locks from the database.
* It is for use with
- * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient).commitTxn} and
- * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient).rollbackTxn}.
+ * {@link #DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).commitTxn} and
+ * {@link #DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).rollbackTxn}.
*/
void clearLocalLockRecords() {
locks.clear();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java Mon Oct 6 03:44:13 2014
@@ -31,8 +31,6 @@ import org.apache.hadoop.hive.ql.QueryPl
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.thrift.TException;
@@ -48,7 +46,7 @@ public class DbTxnManager extends HiveTx
static final private Log LOG = LogFactory.getLog(CLASS_NAME);
private DbLockManager lockMgr = null;
- private IMetaStoreClient client = null;
+ private HiveMetaStoreClient client = null;
private long txnId = 0;
DbTxnManager() {
@@ -286,7 +284,7 @@ public class DbTxnManager extends HiveTx
public ValidTxnList getValidTxns() throws LockException {
init();
try {
- return client.getValidTxns(txnId);
+ return client.getValidTxns();
} catch (TException e) {
throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
e);
@@ -313,6 +311,7 @@ public class DbTxnManager extends HiveTx
try {
if (txnId > 0) rollbackTxn();
if (lockMgr != null) lockMgr.close();
+ if (client != null) client.close();
} catch (Exception e) {
LOG.error("Caught exception " + e.getClass().getName() + " with message <" + e.getMessage()
+ ">, swallowing as there is nothing we can do with it.");
@@ -327,12 +326,10 @@ public class DbTxnManager extends HiveTx
"methods.");
}
try {
- Hive db = Hive.get(conf);
- client = db.getMSC();
+ client = new HiveMetaStoreClient(conf);
} catch (MetaException e) {
- throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e);
- } catch (HiveException e) {
- throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e);
+ throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(),
+ e);
}
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Mon Oct 6 03:44:13 2014
@@ -878,23 +878,6 @@ public class Hive {
/**
* Drops table along with the data in it. If the table doesn't exist then it
- * is a no-op. If ifPurge option is specified it is passed to the
- * hdfs command that removes table data from warehouse to make it skip trash.
- *
- * @param tableName
- * table to drop
- * @param ifPurge
- * completely purge the table (skipping trash) while removing data from warehouse
- * @throws HiveException
- * thrown if the drop fails
- */
- public void dropTable(String tableName, boolean ifPurge) throws HiveException {
- String[] names = Utilities.getDbTableName(tableName);
- dropTable(names[0], names[1], true, true, ifPurge);
- }
-
- /**
- * Drops table along with the data in it. If the table doesn't exist then it
* is a no-op
*
* @param tableName
@@ -903,7 +886,8 @@ public class Hive {
* thrown if the drop fails
*/
public void dropTable(String tableName) throws HiveException {
- dropTable(tableName, false);
+ String[] names = Utilities.getDbTableName(tableName);
+ dropTable(names[0], names[1], true, true);
}
/**
@@ -918,7 +902,7 @@ public class Hive {
* thrown if the drop fails
*/
public void dropTable(String dbName, String tableName) throws HiveException {
- dropTable(dbName, tableName, true, true, false);
+ dropTable(dbName, tableName, true, true);
}
/**
@@ -929,31 +913,14 @@ public class Hive {
* @param deleteData
* deletes the underlying data along with metadata
* @param ignoreUnknownTab
- * an exception is thrown if this is false and the table doesn't exist
+ * an exception if thrown if this is falser and table doesn't exist
* @throws HiveException
*/
public void dropTable(String dbName, String tableName, boolean deleteData,
boolean ignoreUnknownTab) throws HiveException {
- dropTable(dbName, tableName, deleteData, ignoreUnknownTab, false);
- }
- /**
- * Drops the table.
- *
- * @param dbName
- * @param tableName
- * @param deleteData
- * deletes the underlying data along with metadata
- * @param ignoreUnknownTab
- * an exception is thrown if this is false and the table doesn't exist
- * @param ifPurge
- * completely purge the table skipping trash while removing data from warehouse
- * @throws HiveException
- */
- public void dropTable(String dbName, String tableName, boolean deleteData,
- boolean ignoreUnknownTab, boolean ifPurge) throws HiveException {
try {
- getMSC().dropTable(dbName, tableName, deleteData, ignoreUnknownTab, ifPurge);
+ getMSC().dropTable(dbName, tableName, deleteData, ignoreUnknownTab);
} catch (NoSuchObjectException e) {
if (!ignoreUnknownTab) {
throw new HiveException(e);
@@ -1237,15 +1204,6 @@ public class Hive {
return getDatabase(currentDb);
}
- public void loadPartition(Path loadPath, String tableName,
- Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
- boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
- boolean isSrcLocal, boolean isAcid) throws HiveException {
- Table tbl = getTable(tableName);
- loadPartition(loadPath, tbl, partSpec, replace, holdDDLTime, inheritTableSpecs,
- isSkewedStoreAsSubdir, isSrcLocal, isAcid);
- }
-
/**
* Load a directory into a Hive Table Partition - Alters existing content of
* the partition with the contents of loadPath. - If the partition does not
@@ -1254,7 +1212,7 @@ public class Hive {
*
* @param loadPath
* Directory containing files to load into Table
- * @param tbl
+ * @param tableName
* name of table to be loaded.
* @param partSpec
* defines which partition needs to be loaded
@@ -1267,12 +1225,12 @@ public class Hive {
* @param isSrcLocal
* If the source directory is LOCAL
*/
- public Partition loadPartition(Path loadPath, Table tbl,
+ public void loadPartition(Path loadPath, String tableName,
Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
boolean isSrcLocal, boolean isAcid) throws HiveException {
+ Table tbl = getTable(tableName);
Path tblDataLocationPath = tbl.getDataLocation();
- Partition newTPart = null;
try {
/**
* Move files before creating the partition since down stream processes
@@ -1321,10 +1279,10 @@ public class Hive {
Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid);
}
- boolean forceCreate = (!holdDDLTime) ? true : false;
- newTPart = getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), inheritTableSpecs);
// recreate the partition if it existed before
if (!holdDDLTime) {
+ Partition newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(),
+ inheritTableSpecs);
if (isSkewedStoreAsSubdir) {
org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition();
SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo();
@@ -1334,9 +1292,9 @@ public class Hive {
/* Add list bucketing location mappings. */
skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
- alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart));
+ alterPartition(tbl.getTableName(), new Partition(tbl, newCreatedTpart));
newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs);
- return new Partition(tbl, newCreatedTpart);
+ newCreatedTpart = newTPart.getTPartition();
}
}
} catch (IOException e) {
@@ -1349,7 +1307,7 @@ public class Hive {
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
}
- return newTPart;
+
}
/**
@@ -1445,18 +1403,18 @@ private void constructOneLBLocationMap(F
* @param replace
* @param numDP number of dynamic partitions
* @param holdDDLTime
- * @return partition map details (PartitionSpec and Partition)
+ * @return a list of strings with the dynamic partition paths
* @throws HiveException
*/
- public Map<Map<String, String>, Partition> loadDynamicPartitions(Path loadPath,
+ public ArrayList<LinkedHashMap<String, String>> loadDynamicPartitions(Path loadPath,
String tableName, Map<String, String> partSpec, boolean replace,
int numDP, boolean holdDDLTime, boolean listBucketingEnabled, boolean isAcid)
throws HiveException {
Set<Path> validPartitions = new HashSet<Path>();
try {
- Map<Map<String, String>, Partition> partitionsMap = new
- LinkedHashMap<Map<String, String>, Partition>();
+ ArrayList<LinkedHashMap<String, String>> fullPartSpecs =
+ new ArrayList<LinkedHashMap<String, String>>();
FileSystem fs = loadPath.getFileSystem(conf);
FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP+1, fs);
@@ -1490,7 +1448,6 @@ private void constructOneLBLocationMap(F
+ " to at least " + validPartitions.size() + '.');
}
- Table tbl = getTable(tableName);
// for each dynamically created DP directory, construct a full partition spec
// and load the partition based on that
Iterator<Path> iter = validPartitions.iterator();
@@ -1503,12 +1460,14 @@ private void constructOneLBLocationMap(F
// generate a full partition specification
LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>(partSpec);
Warehouse.makeSpecFromName(fullPartSpec, partPath);
- Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace,
- holdDDLTime, true, listBucketingEnabled, false, isAcid);
- partitionsMap.put(fullPartSpec, newPartition);
+ fullPartSpecs.add(fullPartSpec);
+
+ // finally load the partition -- move the file to the final table address
+ loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true,
+ listBucketingEnabled, false, isAcid);
LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
}
- return partitionsMap;
+ return fullPartSpecs;
} catch (IOException e) {
throw new HiveException(e);
}
@@ -1777,7 +1736,6 @@ private void constructOneLBLocationMap(F
public List<Partition> dropPartitions(String dbName, String tblName,
List<DropTableDesc.PartSpec> partSpecs, boolean deleteData, boolean ignoreProtection,
boolean ifExists) throws HiveException {
- //TODO: add support for ifPurge
try {
Table tbl = getTable(dbName, tblName);
List<ObjectPair<Integer, byte[]>> partExprs =
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Mon Oct 6 03:44:13 2014
@@ -451,11 +451,7 @@ public class SessionHiveMetaStoreClient
// Delete table data
if (deleteData && !MetaStoreUtils.isExternalTable(table)) {
try {
- boolean ifPurge = false;
- if (envContext != null){
- ifPurge = Boolean.parseBoolean(envContext.getProperties().get("ifPurge"));
- }
- getWh().deleteDir(tablePath, true, ifPurge);
+ getWh().deleteDir(tablePath, true);
} catch (Exception err) {
LOG.error("Failed to delete temp table directory: " + tablePath, err);
// Forgive error
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Mon Oct 6 03:44:13 2014
@@ -670,15 +670,10 @@ public final class ConstantPropagateProc
cppCtx.getOpToConstantExprs().put(op, constants);
foldOperator(op, cppCtx);
List<ExprNodeDesc> colList = op.getConf().getColList();
- List<String> columnNames = op.getConf().getOutputColumnNames();
- Map<String, ExprNodeDesc> columnExprMap = op.getColumnExprMap();
if (colList != null) {
for (int i = 0; i < colList.size(); i++) {
ExprNodeDesc newCol = foldExpr(colList.get(i), constants, cppCtx, op, 0, false);
colList.set(i, newCol);
- if (columnExprMap != null) {
- columnExprMap.put(columnNames.get(i), newCol);
- }
}
LOG.debug("New column list:(" + StringUtils.join(colList, " ") + ")");
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Mon Oct 6 03:44:13 2014
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -30,17 +29,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
-import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
-import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MuxOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -48,16 +42,12 @@ import org.apache.hadoop.hive.ql.parse.O
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
-import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
-import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.OpTraits;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.Statistics;
-import org.apache.hadoop.util.ReflectionUtils;
/**
* ConvertJoinMapJoin is an optimization that replaces a common join
@@ -70,46 +60,39 @@ public class ConvertJoinMapJoin implemen
static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName());
- @SuppressWarnings("unchecked")
@Override
- /*
- * (non-Javadoc) we should ideally not modify the tree we traverse. However,
- * since we need to walk the tree at any time when we modify the operator, we
- * might as well do it here.
- */
- public Object
- process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
- throws SemanticException {
+ /*
+ * (non-Javadoc)
+ * we should ideally not modify the tree we traverse.
+ * However, since we need to walk the tree at any time when we modify the
+ * operator, we might as well do it here.
+ */
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs)
+ throws SemanticException {
OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
- JoinOperator joinOp = (JoinOperator) nd;
-
- if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)
- && !(context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) {
- // we are just converting to a common merge join operator. The shuffle
- // join in map-reduce case.
- int pos = 0; // it doesn't matter which position we use in this case.
- convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+ if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
return null;
}
- // if we have traits, and table info is present in the traits, we know the
+ JoinOperator joinOp = (JoinOperator) nd;
+ // if we have traits, and table info is present in the traits, we know the
// exact number of buckets. Else choose the largest number of estimated
// reducers from the parent operators.
int numBuckets = -1;
int estimatedBuckets = -1;
- TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) {
if (parentOp.getOpTraits().getNumBuckets() > 0) {
- numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ?
- parentOp.getOpTraits().getNumBuckets() : numBuckets;
+ numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ?
+ parentOp.getOpTraits().getNumBuckets() : numBuckets;
}
if (parentOp instanceof ReduceSinkOperator) {
ReduceSinkOperator rs = (ReduceSinkOperator)parentOp;
- estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ?
+ estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ?
rs.getConf().getNumReducers() : estimatedBuckets;
}
}
@@ -124,80 +107,29 @@ public class ConvertJoinMapJoin implemen
numBuckets = 1;
}
LOG.info("Estimated number of buckets " + numBuckets);
- int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets);
+ int mapJoinConversionPos = mapJoinConversionPos(joinOp, context, numBuckets);
if (mapJoinConversionPos < 0) {
- // we cannot convert to bucket map join, we cannot convert to
- // map join either based on the size. Check if we can convert to SMB join.
- if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
- convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
- return null;
- }
- Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
- try {
- bigTableMatcherClass =
- (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar(
- context.parseContext.getConf(),
- HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR)));
- } catch (ClassNotFoundException e) {
- throw new SemanticException(e.getMessage());
- }
-
- BigTableSelectorForAutoSMJ bigTableMatcher =
- ReflectionUtils.newInstance(bigTableMatcherClass, null);
- JoinDesc joinDesc = joinOp.getConf();
- JoinCondDesc[] joinCondns = joinDesc.getConds();
- Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns);
- if (joinCandidates.isEmpty()) {
- // This is a full outer join. This can never be a map-join
- // of any type. So return false.
- return false;
- }
- mapJoinConversionPos =
- bigTableMatcher.getBigTablePosition(context.parseContext, joinOp, joinCandidates);
- if (mapJoinConversionPos < 0) {
- // contains aliases from sub-query
- // we are just converting to a common merge join operator. The shuffle
- // join in map-reduce case.
- int pos = 0; // it doesn't matter which position we use in this case.
- convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
- return null;
- }
-
- if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
- convertJoinSMBJoin(joinOp, context, mapJoinConversionPos,
- tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true);
- } else {
- // we are just converting to a common merge join operator. The shuffle
- // join in map-reduce case.
- int pos = 0; // it doesn't matter which position we use in this case.
- convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
- }
+ // we cannot convert to bucket map join, we cannot convert to
+ // map join either based on the size
return null;
}
- if (numBuckets > 1) {
- if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
- if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
- return null;
- }
+ if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
+ if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos)) {
+ return null;
}
}
LOG.info("Convert to non-bucketed map join");
// check if we can convert to map join no bucket scaling.
- mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1);
+ mapJoinConversionPos = mapJoinConversionPos(joinOp, context, 1);
if (mapJoinConversionPos < 0) {
- // we are just converting to a common merge join operator. The shuffle
- // join in map-reduce case.
- int pos = 0; // it doesn't matter which position we use in this case.
- convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
return null;
}
MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos);
// map join operator by default has no bucket cols
- mapJoinOp.setOpTraits(new OpTraits(null, -1, null));
- mapJoinOp.setStatistics(joinOp.getStatistics());
+ mapJoinOp.setOpTraits(new OpTraits(null, -1));
// propagate this change till the next RS
for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) {
setAllChildrenTraitsToNull(childOp);
@@ -206,107 +138,11 @@ public class ConvertJoinMapJoin implemen
return null;
}
- // replaces the join operator with a new CommonJoinOperator, removes the
- // parent reduce sinks
- private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
- int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren)
- throws SemanticException {
- ParseContext parseContext = context.parseContext;
- MapJoinDesc mapJoinDesc = null;
- if (adjustParentsChildren) {
- mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
- joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true);
- } else {
- JoinDesc joinDesc = joinOp.getConf();
- // retain the original join desc in the map join.
- mapJoinDesc =
- new MapJoinDesc(null, null, joinDesc.getExprs(), null, null,
- joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
- joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
- }
-
- @SuppressWarnings("unchecked")
- CommonMergeJoinOperator mergeJoinOp =
- (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets,
- isSubQuery, mapJoinConversionPos, mapJoinDesc));
- OpTraits opTraits =
- new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits()
- .getSortCols());
- mergeJoinOp.setOpTraits(opTraits);
- mergeJoinOp.setStatistics(joinOp.getStatistics());
-
- for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
- int pos = parentOp.getChildOperators().indexOf(joinOp);
- parentOp.getChildOperators().remove(pos);
- parentOp.getChildOperators().add(pos, mergeJoinOp);
- }
-
- for (Operator<? extends OperatorDesc> childOp : joinOp.getChildOperators()) {
- int pos = childOp.getParentOperators().indexOf(joinOp);
- childOp.getParentOperators().remove(pos);
- childOp.getParentOperators().add(pos, mergeJoinOp);
- }
-
- List<Operator<? extends OperatorDesc>> childOperators = mergeJoinOp.getChildOperators();
- if (childOperators == null) {
- childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
- mergeJoinOp.setChildOperators(childOperators);
- }
-
- List<Operator<? extends OperatorDesc>> parentOperators = mergeJoinOp.getParentOperators();
- if (parentOperators == null) {
- parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
- mergeJoinOp.setParentOperators(parentOperators);
- }
-
- childOperators.clear();
- parentOperators.clear();
- childOperators.addAll(joinOp.getChildOperators());
- parentOperators.addAll(joinOp.getParentOperators());
- mergeJoinOp.getConf().setGenJoinKeys(false);
-
- if (adjustParentsChildren) {
- mergeJoinOp.getConf().setGenJoinKeys(true);
- List<Operator<? extends OperatorDesc>> newParentOpList =
- new ArrayList<Operator<? extends OperatorDesc>>();
- for (Operator<? extends OperatorDesc> parentOp : mergeJoinOp.getParentOperators()) {
- for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) {
- grandParentOp.getChildOperators().remove(parentOp);
- grandParentOp.getChildOperators().add(mergeJoinOp);
- newParentOpList.add(grandParentOp);
- }
- }
- mergeJoinOp.getParentOperators().clear();
- mergeJoinOp.getParentOperators().addAll(newParentOpList);
- List<Operator<? extends OperatorDesc>> parentOps =
- new ArrayList<Operator<? extends OperatorDesc>>(mergeJoinOp.getParentOperators());
- for (Operator<? extends OperatorDesc> parentOp : parentOps) {
- int parentIndex = mergeJoinOp.getParentOperators().indexOf(parentOp);
- if (parentIndex == mapJoinConversionPos) {
- continue;
- }
-
- // insert the dummy store operator here
- DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator();
- dummyStoreOp.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
- dummyStoreOp.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>());
- dummyStoreOp.getChildOperators().add(mergeJoinOp);
- int index = parentOp.getChildOperators().indexOf(mergeJoinOp);
- parentOp.getChildOperators().remove(index);
- parentOp.getChildOperators().add(index, dummyStoreOp);
- dummyStoreOp.getParentOperators().add(parentOp);
- mergeJoinOp.getParentOperators().remove(parentIndex);
- mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp);
- }
- }
- mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators());
- }
-
private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> currentOp) {
if (currentOp instanceof ReduceSinkOperator) {
return;
}
- currentOp.setOpTraits(new OpTraits(null, -1, null));
+ currentOp.setOpTraits(new OpTraits(null, -1));
for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) {
if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) {
break;
@@ -315,26 +151,28 @@ public class ConvertJoinMapJoin implemen
}
}
- private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
- int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+ int bigTablePosition) throws SemanticException {
+
+ TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
if (!checkConvertJoinBucketMapJoin(joinOp, context, bigTablePosition, tezBucketJoinProcCtx)) {
LOG.info("Check conversion to bucket map join failed.");
return false;
}
- MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition);
+ MapJoinOperator mapJoinOp =
+ convertJoinMapJoin(joinOp, context, bigTablePosition);
MapJoinDesc joinDesc = mapJoinOp.getConf();
joinDesc.setBucketMapJoin(true);
// we can set the traits for this join operator
OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(),
- tezBucketJoinProcCtx.getNumBuckets(), null);
+ tezBucketJoinProcCtx.getNumBuckets());
mapJoinOp.setOpTraits(opTraits);
- mapJoinOp.setStatistics(joinOp.getStatistics());
setNumberOfBucketsOnChildren(mapJoinOp);
- // Once the conversion is done, we can set the partitioner to bucket cols on the small table
+ // Once the conversion is done, we can set the partitioner to bucket cols on the small table
Map<String, Integer> bigTableBucketNumMapping = new HashMap<String, Integer>();
bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), tezBucketJoinProcCtx.getNumBuckets());
joinDesc.setBigTableBucketNumMapping(bigTableBucketNumMapping);
@@ -344,54 +182,6 @@ public class ConvertJoinMapJoin implemen
return true;
}
- /*
- * This method tries to convert a join to an SMB. This is done based on
- * traits. If the sorted by columns are the same as the join columns then, we
- * can convert the join to an SMB. Otherwise retain the bucket map join as it
- * is still more efficient than a regular join.
- */
- private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
- int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
-
- ReduceSinkOperator bigTableRS =
- (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
- int numBuckets = bigTableRS.getParentOperators().get(0).getOpTraits()
- .getNumBuckets();
-
- // the sort and bucket cols have to match on both sides for this
- // transformation of the join operation
- for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
- if (!(parentOp instanceof ReduceSinkOperator)) {
- // could be mux/demux operators. Currently not supported
- LOG.info("Found correlation optimizer operators. Cannot convert to SMB at this time.");
- return false;
- }
- ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp;
- if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp
- .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) {
- LOG.info("We cannot convert to SMB because the sort column names do not match.");
- return false;
- }
-
- if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp
- .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx)
- == false) {
- LOG.info("We cannot convert to SMB because bucket column names do not match.");
- return false;
- }
- }
-
- boolean isSubQuery = false;
- if (numBuckets < 0) {
- isSubQuery = true;
- numBuckets = bigTableRS.getConf().getNumReducers();
- }
- tezBucketJoinProcCtx.setNumBuckets(numBuckets);
- tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
- LOG.info("We can convert the join to an SMB join.");
- return true;
- }
-
private void setNumberOfBucketsOnChildren(Operator<? extends OperatorDesc> currentOp) {
int numBuckets = currentOp.getOpTraits().getNumBuckets();
for (Operator<? extends OperatorDesc>op : currentOp.getChildOperators()) {
@@ -403,13 +193,15 @@ public class ConvertJoinMapJoin implemen
}
/*
- * If the parent reduce sink of the big table side has the same emit key cols
- * as its parent, we can create a bucket map join eliminating the reduce sink.
+ * We perform the following checks to see if we can convert to a bucket map join
+ * 1. If the parent reduce sink of the big table side has the same emit key cols as
+ * its parent, we can create a bucket map join eliminating the reduce sink.
+ * 2. If we have the table information, we can check the same way as in Mapreduce to
+ * determine if we can perform a Bucket Map Join.
*/
- private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp,
- OptimizeTezProcContext context, int bigTablePosition,
- TezBucketJoinProcCtx tezBucketJoinProcCtx)
- throws SemanticException {
+ private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp,
+ OptimizeTezProcContext context, int bigTablePosition,
+ TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
// bail on mux-operator because mux operator masks the emit keys of the
// constituent reduce sinks
if (!(joinOp.getParentOperators().get(0) instanceof ReduceSinkOperator)) {
@@ -419,41 +211,14 @@ public class ConvertJoinMapJoin implemen
}
ReduceSinkOperator rs = (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
- List<List<String>> parentColNames = rs.getOpTraits().getBucketColNames();
- Operator<? extends OperatorDesc> parentOfParent = rs.getParentOperators().get(0);
- List<List<String>> grandParentColNames = parentOfParent.getOpTraits().getBucketColNames();
- int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
- // all keys matched.
- if (checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(),
- tezBucketJoinProcCtx) == false) {
- LOG.info("No info available to check for bucket map join. Cannot convert");
- return false;
- }
-
/*
* this is the case when the big table is a sub-query and is probably
- * already bucketed by the join column in say a group by operation
+ * already bucketed by the join column in say a group by operation
*/
- boolean isSubQuery = false;
- if (numBuckets < 0) {
- isSubQuery = true;
- numBuckets = rs.getConf().getNumReducers();
- }
- tezBucketJoinProcCtx.setNumBuckets(numBuckets);
- tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
- return true;
- }
-
- private boolean checkColEquality(List<List<String>> grandParentColNames,
- List<List<String>> parentColNames, Map<String, ExprNodeDesc> colExprMap,
- TezBucketJoinProcCtx tezBucketJoinProcCtx) {
-
- if ((grandParentColNames == null) || (parentColNames == null)) {
- return false;
- }
-
- if ((parentColNames != null) && (parentColNames.isEmpty() == false)) {
- for (List<String> listBucketCols : grandParentColNames) {
+ List<List<String>> colNames = rs.getParentOperators().get(0).getOpTraits().getBucketColNames();
+ if ((colNames != null) && (colNames.isEmpty() == false)) {
+ Operator<? extends OperatorDesc>parentOfParent = rs.getParentOperators().get(0);
+ for (List<String>listBucketCols : parentOfParent.getOpTraits().getBucketColNames()) {
// can happen if this operator does not carry forward the previous bucketing columns
// for e.g. another join operator which does not carry one of the sides' key columns
if (listBucketCols.isEmpty()) {
@@ -461,9 +226,9 @@ public class ConvertJoinMapJoin implemen
}
int colCount = 0;
// parent op is guaranteed to have a single list because it is a reduce sink
- for (String colName : parentColNames.get(0)) {
+ for (String colName : rs.getOpTraits().getBucketColNames().get(0)) {
// all columns need to be at least a subset of the parentOfParent's bucket cols
- ExprNodeDesc exprNodeDesc = colExprMap.get(colName);
+ ExprNodeDesc exprNodeDesc = rs.getColumnExprMap().get(colName);
if (exprNodeDesc instanceof ExprNodeColumnDesc) {
if (((ExprNodeColumnDesc)exprNodeDesc).getColumn().equals(listBucketCols.get(colCount))) {
colCount++;
@@ -471,21 +236,32 @@ public class ConvertJoinMapJoin implemen
break;
}
}
-
- if (colCount == parentColNames.get(0).size()) {
+
+ if (colCount == rs.getOpTraits().getBucketColNames().get(0).size()) {
+ // all keys matched.
+ int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
+ boolean isSubQuery = false;
+ if (numBuckets < 0) {
+ isSubQuery = true;
+ numBuckets = rs.getConf().getNumReducers();
+ }
+ tezBucketJoinProcCtx.setNumBuckets(numBuckets);
+ tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
return true;
}
}
}
return false;
}
+
+ LOG.info("No info available to check for bucket map join. Cannot convert");
return false;
}
- public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
+ public int mapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
int buckets) {
- Set<Integer> bigTableCandidateSet =
- MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds());
+ Set<Integer> bigTableCandidateSet = MapJoinProcessor.
+ getBigTableCandidates(joinOp.getConf().getConds());
long maxSize = context.conf.getLongVar(
HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
@@ -511,7 +287,7 @@ public class ConvertJoinMapJoin implemen
long inputSize = currInputStat.getDataSize();
if ((bigInputStat == null) ||
((bigInputStat != null) &&
- (inputSize > bigInputStat.getDataSize()))) {
+ (inputSize > bigInputStat.getDataSize()))) {
if (bigTableFound) {
// cannot convert to map join; we've already chosen a big table
@@ -571,9 +347,9 @@ public class ConvertJoinMapJoin implemen
* for tez.
*/
- public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
int bigTablePosition) throws SemanticException {
- // bail on mux operator because currently the mux operator masks the emit keys
+ // bail on mux operator because currently the mux operator masks the emit keys
// of the constituent reduce sinks.
for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
if (parentOp instanceof MuxOperator) {
@@ -583,12 +359,12 @@ public class ConvertJoinMapJoin implemen
//can safely convert the join to a map join.
ParseContext parseContext = context.parseContext;
- MapJoinOperator mapJoinOp =
- MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp,
- parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
+ MapJoinOperator mapJoinOp = MapJoinProcessor.
+ convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(),
+ joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
- Operator<? extends OperatorDesc> parentBigTableOp =
- mapJoinOp.getParentOperators().get(bigTablePosition);
+ Operator<? extends OperatorDesc> parentBigTableOp
+ = mapJoinOp.getParentOperators().get(bigTablePosition);
if (parentBigTableOp instanceof ReduceSinkOperator) {
for (Operator<?> p : parentBigTableOp.getParentOperators()) {
// we might have generated a dynamic partition operator chain. Since
@@ -604,10 +380,11 @@ public class ConvertJoinMapJoin implemen
}
}
mapJoinOp.getParentOperators().remove(bigTablePosition);
- if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) {
+ if (!(mapJoinOp.getParentOperators().contains(
+ parentBigTableOp.getParentOperators().get(0)))) {
mapJoinOp.getParentOperators().add(bigTablePosition,
parentBigTableOp.getParentOperators().get(0));
- }
+ }
parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp);
for (Operator<? extends OperatorDesc> op : mapJoinOp.getParentOperators()) {
if (!(op.getChildOperators().contains(mapJoinOp))) {
@@ -620,31 +397,15 @@ public class ConvertJoinMapJoin implemen
return mapJoinOp;
}
- private boolean hasDynamicPartitionBroadcast(Operator<?> parent) {
- boolean hasDynamicPartitionPruning = false;
-
- for (Operator<?> op: parent.getChildOperators()) {
- while (op != null) {
- if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
- // found dynamic partition pruning operator
- hasDynamicPartitionPruning = true;
- break;
- }
-
- if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) {
- // crossing reduce sink or file sink means the pruning isn't for this parent.
- break;
- }
-
- if (op.getChildOperators().size() != 1) {
- // dynamic partition pruning pipeline doesn't have multiple children
- break;
- }
-
- op = op.getChildOperators().get(0);
+ private boolean hasDynamicPartitionBroadcast(Operator<?> op) {
+ if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
+ return true;
+ }
+ for (Operator<?> c : op.getChildOperators()) {
+ if (hasDynamicPartitionBroadcast(c)) {
+ return true;
}
}
-
- return hasDynamicPartitionPruning;
+ return false;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Oct 6 03:44:13 2014
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer;
-import com.google.common.collect.Interner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -40,6 +39,8 @@ import org.apache.hadoop.hive.ql.exec.No
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator;
+import org.apache.hadoop.hive.ql.exec.RCFileMergeOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
@@ -100,6 +101,7 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import java.io.Serializable;
@@ -578,6 +580,8 @@ public final class GenMapRedUtils {
//This read entity is a direct read entity and not an indirect read (that is when
// this is being read because it is a dependency of a view).
boolean isDirectRead = (parentViewInfo == null);
+ PlanUtils.addInput(inputs,
+ new ReadEntity(parseCtx.getTopToTable().get(topOp), parentViewInfo, isDirectRead));
for (Partition part : parts) {
if (part.getTable().isPartitioned()) {
@@ -878,30 +882,6 @@ public final class GenMapRedUtils {
}
}
- public static void internTableDesc(Task<?> task, Interner<TableDesc> interner) {
-
- if (task instanceof ConditionalTask) {
- for (Task tsk : ((ConditionalTask) task).getListTasks()) {
- internTableDesc(tsk, interner);
- }
- } else if (task instanceof ExecDriver) {
- MapredWork work = (MapredWork) task.getWork();
- work.getMapWork().internTable(interner);
- } else if (task != null && (task.getWork() instanceof TezWork)) {
- TezWork work = (TezWork)task.getWork();
- for (BaseWork w : work.getAllWorkUnsorted()) {
- if (w instanceof MapWork) {
- ((MapWork)w).internTable(interner);
- }
- }
- }
- if (task.getNumChild() > 0) {
- for (Task childTask : task.getChildTasks()) {
- internTableDesc(childTask, interner);
- }
- }
- }
-
/**
* create a new plan and return.
*
@@ -1527,7 +1507,7 @@ public final class GenMapRedUtils {
*
* @param fsInputDesc
* @param finalName
- * @param inputFormatClass
+ * @param inputFormatClass
* @return MergeWork if table is stored as RCFile or ORCFile,
* null otherwise
*/
@@ -1734,7 +1714,7 @@ public final class GenMapRedUtils {
// There are separate configuration parameters to control whether to
// merge for a map-only job
// or for a map-reduce job
- if (currTask.getWork() instanceof MapredWork) {
+ if (currTask.getWork() instanceof MapredWork) {
ReduceWork reduceWork = ((MapredWork) currTask.getWork()).getReduceWork();
boolean mergeMapOnly =
hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null;
@@ -1833,7 +1813,7 @@ public final class GenMapRedUtils {
return Collections.emptyList();
}
- public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey)
+ public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey)
throws SemanticException {
List<Path> inputPaths = new ArrayList<Path>();
switch (parseInfo.getTableSpec().specType) {
@@ -1870,7 +1850,6 @@ public final class GenMapRedUtils {
public static Set<Operator<?>> findTopOps(Operator<?> startOp, final Class<?> clazz) {
final Set<Operator<?>> operators = new LinkedHashSet<Operator<?>>();
OperatorUtils.iterateParents(startOp, new NodeUtils.Function<Operator<?>>() {
- @Override
public void apply(Operator<?> argument) {
if (argument.getNumParent() == 0 && (clazz == null || clazz.isInstance(argument))) {
operators.add(argument);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Mon Oct 6 03:44:13 2014
@@ -389,8 +389,157 @@ public class MapJoinProcessor implements
JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
throws SemanticException {
- MapJoinDesc mapJoinDescriptor =
- getMapJoinDesc(hconf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin);
+ JoinDesc desc = op.getConf();
+ JoinCondDesc[] condns = desc.getConds();
+ Byte[] tagOrder = desc.getTagOrder();
+
+ // outer join cannot be performed on a table which is being cached
+ if (!noCheckOuterJoin) {
+ if (checkMapJoin(mapJoinPos, condns) < 0) {
+ throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+ }
+ }
+
+ // Walk over all the sources (which are guaranteed to be reduce sink
+ // operators).
+ // The join outputs a concatenation of all the inputs.
+ QBJoinTree leftSrc = joinTree.getJoinSrc();
+ List<ReduceSinkOperator> oldReduceSinkParentOps =
+ new ArrayList<ReduceSinkOperator>(op.getNumParent());
+ if (leftSrc != null) {
+ // assert mapJoinPos == 0;
+ Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
+ assert parentOp.getParentOperators().size() == 1;
+ oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
+ }
+
+
+ byte pos = 0;
+ for (String src : joinTree.getBaseSrc()) {
+ if (src != null) {
+ Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
+ assert parentOp.getParentOperators().size() == 1;
+ oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
+ }
+ pos++;
+ }
+
+ Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
+ List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
+ Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
+ Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
+ for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
+ byte tag = entry.getKey();
+ Operator<?> terminal = oldReduceSinkParentOps.get(tag);
+
+ List<ExprNodeDesc> values = entry.getValue();
+ List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal);
+ newValueExprs.put(tag, newValues);
+ for (int i = 0; i < schema.size(); i++) {
+ ColumnInfo column = schema.get(i);
+ if (column == null) {
+ continue;
+ }
+ ExprNodeDesc expr = colExprMap.get(column.getInternalName());
+ int index = ExprNodeDescUtils.indexOf(expr, values);
+ if (index >= 0) {
+ colExprMap.put(column.getInternalName(), newValues.get(index));
+ schema.set(i, null);
+ }
+ }
+ }
+
+ // rewrite value index for mapjoin
+ Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
+
+ // get the join keys from old parent ReduceSink operators
+ Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
+
+ // construct valueTableDescs and valueFilteredTableDescs
+ List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
+ List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
+ int[][] filterMap = desc.getFilterMap();
+ for (pos = 0; pos < op.getParentOperators().size(); pos++) {
+ ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
+ List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
+ List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
+ if (pos != mapJoinPos) {
+ // remove values in key exprs for value table schema
+ // value expression for hashsink will be modified in LocalMapJoinProcessor
+ int[] valueIndex = new int[valueCols.size()];
+ List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
+ for (int i = 0; i < valueIndex.length; i++) {
+ ExprNodeDesc expr = valueCols.get(i);
+ int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
+ if (kindex >= 0) {
+ valueIndex[i] = kindex;
+ } else {
+ valueIndex[i] = -valueColsInValueExpr.size() - 1;
+ valueColsInValueExpr.add(expr);
+ }
+ }
+ if (needValueIndex(valueIndex)) {
+ valueIndices.put(pos, valueIndex);
+ }
+ valueCols = valueColsInValueExpr;
+ }
+ // deep copy expr node desc
+ List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
+ if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
+ ExprNodeColumnDesc isFilterDesc = new ExprNodeColumnDesc(TypeInfoFactory
+ .getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", "filter", false);
+ valueFilteredCols.add(isFilterDesc);
+ }
+
+ TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
+ .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
+ TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
+ .getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue"));
+
+ valueTableDescs.add(valueTableDesc);
+ valueFilteredTableDescs.add(valueFilteredTableDesc);
+
+ keyExprMap.put(pos, keyCols);
+ }
+
+ Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
+ Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
+ for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
+ byte srcTag = entry.getKey();
+ List<ExprNodeDesc> filter = entry.getValue();
+
+ Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
+ newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
+ }
+ desc.setFilters(filters = newFilters);
+
+ // create dumpfile prefix needed to create descriptor
+ String dumpFilePrefix = "";
+ if( joinTree.getMapAliases() != null ) {
+ for(String mapAlias : joinTree.getMapAliases()) {
+ dumpFilePrefix = dumpFilePrefix + mapAlias;
+ }
+ dumpFilePrefix = dumpFilePrefix+"-"+PlanUtils.getCountForMapJoinDumpFilePrefix();
+ } else {
+ dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix();
+ }
+
+ List<ExprNodeDesc> keyCols = keyExprMap.get((byte)mapJoinPos);
+
+ List<String> outputColumnNames = op.getConf().getOutputColumnNames();
+ TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf,
+ PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
+ JoinCondDesc[] joinCondns = op.getConf().getConds();
+ MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs,
+ valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns,
+ filters, op.getConf().getNoOuterJoin(), dumpFilePrefix);
+ mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
+ mapJoinDescriptor.setTagOrder(tagOrder);
+ mapJoinDescriptor.setNullSafes(desc.getNullSafes());
+ mapJoinDescriptor.setFilterMap(desc.getFilterMap());
+ if (!valueIndices.isEmpty()) {
+ mapJoinDescriptor.setValueIndices(valueIndices);
+ }
// reduce sink row resolver used to generate map join op
RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
@@ -402,7 +551,6 @@ public class MapJoinProcessor implements
opParseCtxMap.put(mapJoinOp, ctx);
mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs());
- Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
mapJoinOp.setColumnExprMap(colExprMap);
List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators();
@@ -1028,168 +1176,4 @@ public class MapJoinProcessor implements
}
}
-
- public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
- LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
- JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
- JoinDesc desc = op.getConf();
- JoinCondDesc[] condns = desc.getConds();
- Byte[] tagOrder = desc.getTagOrder();
-
- // outer join cannot be performed on a table which is being cached
- if (!noCheckOuterJoin) {
- if (checkMapJoin(mapJoinPos, condns) < 0) {
- throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
- }
- }
-
- // Walk over all the sources (which are guaranteed to be reduce sink
- // operators).
- // The join outputs a concatenation of all the inputs.
- QBJoinTree leftSrc = joinTree.getJoinSrc();
- List<ReduceSinkOperator> oldReduceSinkParentOps =
- new ArrayList<ReduceSinkOperator>(op.getNumParent());
- if (leftSrc != null) {
- // assert mapJoinPos == 0;
- Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
- assert parentOp.getParentOperators().size() == 1;
- oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
- }
-
- byte pos = 0;
- for (String src : joinTree.getBaseSrc()) {
- if (src != null) {
- Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
- assert parentOp.getParentOperators().size() == 1;
- oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
- }
- pos++;
- }
-
- Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
- List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
- Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
- Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
- for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
- byte tag = entry.getKey();
- Operator<?> terminal = oldReduceSinkParentOps.get(tag);
-
- List<ExprNodeDesc> values = entry.getValue();
- List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal);
- newValueExprs.put(tag, newValues);
- for (int i = 0; i < schema.size(); i++) {
- ColumnInfo column = schema.get(i);
- if (column == null) {
- continue;
- }
- ExprNodeDesc expr = colExprMap.get(column.getInternalName());
- int index = ExprNodeDescUtils.indexOf(expr, values);
- if (index >= 0) {
- colExprMap.put(column.getInternalName(), newValues.get(index));
- schema.set(i, null);
- }
- }
- }
-
- // rewrite value index for mapjoin
- Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
-
- // get the join keys from old parent ReduceSink operators
- Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
-
- // construct valueTableDescs and valueFilteredTableDescs
- List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
- List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
- int[][] filterMap = desc.getFilterMap();
- for (pos = 0; pos < op.getParentOperators().size(); pos++) {
- ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
- List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
- List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
- if (pos != mapJoinPos) {
- // remove values in key exprs for value table schema
- // value expression for hashsink will be modified in
- // LocalMapJoinProcessor
- int[] valueIndex = new int[valueCols.size()];
- List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
- for (int i = 0; i < valueIndex.length; i++) {
- ExprNodeDesc expr = valueCols.get(i);
- int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
- if (kindex >= 0) {
- valueIndex[i] = kindex;
- } else {
- valueIndex[i] = -valueColsInValueExpr.size() - 1;
- valueColsInValueExpr.add(expr);
- }
- }
- if (needValueIndex(valueIndex)) {
- valueIndices.put(pos, valueIndex);
- }
- valueCols = valueColsInValueExpr;
- }
- // deep copy expr node desc
- List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
- if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
- ExprNodeColumnDesc isFilterDesc =
- new ExprNodeColumnDesc(
- TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter",
- "filter", false);
- valueFilteredCols.add(isFilterDesc);
- }
-
- TableDesc valueTableDesc =
- PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols,
- "mapjoinvalue"));
- TableDesc valueFilteredTableDesc =
- PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(
- valueFilteredCols, "mapjoinvalue"));
-
- valueTableDescs.add(valueTableDesc);
- valueFilteredTableDescs.add(valueFilteredTableDesc);
-
- keyExprMap.put(pos, keyCols);
- }
-
- Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
- Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
- for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
- byte srcTag = entry.getKey();
- List<ExprNodeDesc> filter = entry.getValue();
-
- Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
- newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
- }
- desc.setFilters(filters = newFilters);
-
- // create dumpfile prefix needed to create descriptor
- String dumpFilePrefix = "";
- if (joinTree.getMapAliases() != null) {
- for (String mapAlias : joinTree.getMapAliases()) {
- dumpFilePrefix = dumpFilePrefix + mapAlias;
- }
- dumpFilePrefix = dumpFilePrefix + "-" + PlanUtils.getCountForMapJoinDumpFilePrefix();
- } else {
- dumpFilePrefix = "mapfile" + PlanUtils.getCountForMapJoinDumpFilePrefix();
- }
-
- List<ExprNodeDesc> keyCols = keyExprMap.get((byte) mapJoinPos);
-
- List<String> outputColumnNames = op.getConf().getOutputColumnNames();
- TableDesc keyTableDesc =
- PlanUtils.getMapJoinKeyTableDesc(hconf,
- PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
- JoinCondDesc[] joinCondns = op.getConf().getConds();
- MapJoinDesc mapJoinDescriptor =
- new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs,
- valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op
- .getConf().getNoOuterJoin(), dumpFilePrefix);
- mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
- mapJoinDescriptor.setTagOrder(tagOrder);
- mapJoinDescriptor.setNullSafes(desc.getNullSafes());
- mapJoinDescriptor.setFilterMap(desc.getFilterMap());
- if (!valueIndices.isEmpty()) {
- mapJoinDescriptor.setValueIndices(valueIndices);
- }
-
- return mapJoinDescriptor;
- }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Mon Oct 6 03:44:13 2014
@@ -51,12 +51,7 @@ public class Optimizer {
* @param hiveConf
*/
public void initialize(HiveConf hiveConf) {
-
- boolean isTezExecEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
- boolean bucketMapJoinOptimizer = false;
-
transformations = new ArrayList<Transform>();
-
// Add the transformation that computes the lineage information.
transformations.add(new Generator());
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
@@ -86,16 +81,15 @@ public class Optimizer {
}
transformations.add(new SamplePruner());
transformations.add(new MapJoinProcessor());
-
- if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) && !isTezExecEngine) {
+ boolean bucketMapJoinOptimizer = false;
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) {
transformations.add(new BucketMapJoinOptimizer());
bucketMapJoinOptimizer = true;
}
// If optimize hive.optimize.bucketmapjoin.sortedmerge is set, add both
// BucketMapJoinOptimizer and SortedMergeBucketMapJoinOptimizer
- if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN))
- && !isTezExecEngine) {
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
if (!bucketMapJoinOptimizer) {
// No need to add BucketMapJoinOptimizer twice
transformations.add(new BucketMapJoinOptimizer());
@@ -125,7 +119,7 @@ public class Optimizer {
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME) &&
- !isTezExecEngine) {
+ !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
transformations.add(new CorrelationOptimizer());
}
if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) {
@@ -135,7 +129,7 @@ public class Optimizer {
transformations.add(new StatsOptimizer());
}
String execEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
- if (pctx.getContext().getExplain() && !"spark".equals(execEngine) && !"tez".equals(execEngine)) {
+ if ((pctx.getContext().getExplain() || "spark".equals(execEngine)) && !"tez".equals(execEngine)) {
transformations.add(new AnnotateWithStatistics());
transformations.add(new AnnotateWithOpTraits());
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Mon Oct 6 03:44:13 2014
@@ -52,7 +52,6 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
public class ReduceSinkMapJoinProc implements NodeProcessor {
@@ -184,10 +183,7 @@ public class ReduceSinkMapJoinProc imple
TezWork tezWork = context.currentTask.getWork();
LOG.debug("connecting "+parentWork.getName()+" with "+myWork.getName());
tezWork.connect(parentWork, myWork, edgeProp);
- if (edgeType == EdgeType.CUSTOM_EDGE) {
- tezWork.setVertexType(myWork, VertexType.INITIALIZED_EDGES);
- }
-
+
ReduceSinkOperator r = null;
if (parentRS.getConf().getOutputName() != null) {
LOG.debug("Cloning reduce sink for multi-child broadcast edge");
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java Mon Oct 6 03:44:13 2014
@@ -44,9 +44,9 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
-import org.apache.hadoop.hive.ql.metadata.InputEstimator;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -55,25 +55,13 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.QB;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.SplitSample;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToChar;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDate;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
@@ -85,11 +73,9 @@ public class SimpleFetchOptimizer implem
private final Log LOG = LogFactory.getLog(SimpleFetchOptimizer.class.getName());
- @Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
Map<String, Operator<? extends OperatorDesc>> topOps = pctx.getTopOps();
- if (pctx.getQB().getIsQuery() && !pctx.getQB().getParseInfo().isAnalyzeCommand()
- && topOps.size() == 1) {
+ if (pctx.getQB().isSimpleSelectQuery() && topOps.size() == 1) {
// no join, no groupby, no distinct, no lateral view, no subq,
// no CTAS or insert, not analyze command, and single sourced.
String alias = (String) pctx.getTopOps().keySet().toArray()[0];
@@ -158,7 +144,7 @@ public class SimpleFetchOptimizer implem
// for non-aggressive mode (minimal)
// 1. samping is not allowed
// 2. for partitioned table, all filters should be targeted to partition column
- // 3. SelectOperator should use only simple cast/column access
+ // 3. SelectOperator should be select star
private FetchData checkTree(boolean aggressive, ParseContext pctx, String alias,
TableScanOperator ts) throws HiveException {
SplitSample splitSample = pctx.getNameToSplitSample().get(alias);
@@ -170,7 +156,7 @@ public class SimpleFetchOptimizer implem
return null;
}
- Table table = pctx.getTopToTable().get(ts);
+ Table table = qb.getMetaData().getAliasToTable().get(alias);
if (table == null) {
return null;
}
@@ -195,71 +181,34 @@ public class SimpleFetchOptimizer implem
return null;
}
- private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggressive,
+ private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggresive,
boolean bypassFilter) {
if (ts.getChildOperators().size() != 1) {
return null;
}
Operator<?> op = ts.getChildOperators().get(0);
for (; ; op = op.getChildOperators().get(0)) {
- if (op instanceof SelectOperator) {
- if (!aggressive) {
- if (!checkExpressions((SelectOperator) op)) {
- break;
- }
- }
- continue;
- }
-
- if (aggressive) {
- if (!(op instanceof LimitOperator || op instanceof FilterOperator)) {
+ if (aggresive) {
+ if (!(op instanceof LimitOperator || op instanceof FilterOperator
+ || op instanceof SelectOperator)) {
break;
}
- } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter))) {
+ } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter)
+ || (op instanceof SelectOperator && ((SelectOperator) op).getConf().isSelectStar()))) {
break;
}
-
if (op.getChildOperators() == null || op.getChildOperators().size() != 1) {
return null;
}
}
-
if (op instanceof FileSinkOperator) {
fetch.scanOp = ts;
fetch.fileSink = op;
return fetch;
}
-
return null;
}
- private boolean checkExpressions(SelectOperator op) {
- SelectDesc desc = op.getConf();
- for (ExprNodeDesc expr : desc.getColList()) {
- if (!checkExpression(expr)) {
- return false;
- }
- }
- return true;
- }
-
- private boolean checkExpression(ExprNodeDesc expr) {
- if (expr instanceof ExprNodeConstantDesc || expr instanceof ExprNodeColumnDesc) {
- return true;
- }
-
- if (expr instanceof ExprNodeGenericFuncDesc) {
- GenericUDF udf = ((ExprNodeGenericFuncDesc) expr).getGenericUDF();
- if (udf instanceof GenericUDFToBinary || udf instanceof GenericUDFToChar
- || udf instanceof GenericUDFToDate || udf instanceof GenericUDFToDecimal
- || udf instanceof GenericUDFToUnixTimeStamp || udf instanceof GenericUDFToUtcTimestamp
- || udf instanceof GenericUDFToVarchar) {
- return expr.getChildren().size() == 1 && checkExpression(expr.getChildren().get(0));
- }
- }
- return false;
- }
-
private class FetchData {
private final ReadEntity parent;
@@ -291,7 +240,7 @@ public class SimpleFetchOptimizer implem
this.splitSample = splitSample;
this.onlyPruningFilter = bypassFilter;
}
-
+
/*
* all filters were executed during partition pruning
*/
@@ -302,7 +251,7 @@ public class SimpleFetchOptimizer implem
private FetchWork convertToWork() throws HiveException {
inputs.clear();
if (!table.isPartitioned()) {
- inputs.add(new ReadEntity(table, parent, parent == null));
+ inputs.add(new ReadEntity(table, parent));
FetchWork work = new FetchWork(table.getPath(), Utilities.getTableDesc(table));
PlanUtils.configureInputJobPropertiesForStorageHandler(work.getTblDesc());
work.setSplitSample(splitSample);
@@ -312,12 +261,12 @@ public class SimpleFetchOptimizer implem
List<PartitionDesc> partP = new ArrayList<PartitionDesc>();
for (Partition partition : partsList.getNotDeniedPartns()) {
- inputs.add(new ReadEntity(partition, parent, parent == null));
+ inputs.add(new ReadEntity(partition, parent));
listP.add(partition.getDataLocation());
partP.add(Utilities.getPartitionDesc(partition));
}
Table sourceTable = partsList.getSourceTable();
- inputs.add(new ReadEntity(sourceTable, parent, parent == null));
+ inputs.add(new ReadEntity(sourceTable, parent));
TableDesc table = Utilities.getTableDesc(sourceTable);
FetchWork work = new FetchWork(listP, partP, table);
if (!work.getPartDesc().isEmpty()) {