You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2013/04/26 06:59:58 UTC
svn commit: r1476039 [8/22] - in /hive/branches/HIVE-4115: ./ beeline/
beeline/src/java/org/apache/hive/beeline/ bin/ builtins/ cli/
common/src/java/org/apache/hadoop/hive/conf/ conf/ data/files/
eclipse-templates/ hbase-handler/ hbase-handler/src/java...
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Fri Apr 26 04:59:50 2013
@@ -40,10 +40,10 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.Map.Entry;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
@@ -86,6 +86,8 @@ import org.apache.hadoop.hive.ql.hooks.R
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask;
+import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
@@ -111,8 +113,8 @@ import org.apache.hadoop.hive.ql.plan.Al
import org.apache.hadoop.hive.ql.plan.AlterIndexDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableAlterPartDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
-import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
import org.apache.hadoop.hive.ql.plan.AlterTableSimpleDesc;
+import org.apache.hadoop.hive.ql.plan.AlterTableExchangePartition;
import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.CreateIndexDesc;
import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
@@ -150,6 +152,7 @@ import org.apache.hadoop.hive.ql.plan.Sh
import org.apache.hadoop.hive.ql.plan.SwitchDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.TruncateTableDesc;
import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.security.authorization.Privilege;
import org.apache.hadoop.hive.serde.serdeConstants;
@@ -429,6 +432,12 @@ public class DDLTask extends Task<DDLWor
return truncateTable(db, truncateTableDesc);
}
+ AlterTableExchangePartition alterTableExchangePartition =
+ work.getAlterTableExchangePartition();
+ if (alterTableExchangePartition != null) {
+ return exchangeTablePartition(db, alterTableExchangePartition);
+ }
+
} catch (InvalidTableException e) {
formatter.consoleError(console, "Table " + e.getTableName() + " does not exist",
formatter.MISSING);
@@ -3951,6 +3960,21 @@ public class DDLTask extends Task<DDLWor
}
private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws HiveException {
+
+ if (truncateTableDesc.getColumnIndexes() != null) {
+ ColumnTruncateWork truncateWork = new ColumnTruncateWork(
+ truncateTableDesc.getColumnIndexes(), truncateTableDesc.getInputDir(),
+ truncateTableDesc.getOutputDir());
+ truncateWork.setListBucketingCtx(truncateTableDesc.getLbCtx());
+ truncateWork.setMapperCannotSpanPartns(true);
+ DriverContext driverCxt = new DriverContext();
+ ColumnTruncateTask taskExec = new ColumnTruncateTask();
+ taskExec.initialize(db.getConf(), null, driverCxt);
+ taskExec.setWork(truncateWork);
+ taskExec.setQueryPlan(this.getQueryPlan());
+ return taskExec.execute(driverCxt);
+ }
+
String tableName = truncateTableDesc.getTableName();
Map<String, String> partSpec = truncateTableDesc.getPartSpec();
@@ -3969,6 +3993,17 @@ public class DDLTask extends Task<DDLWor
return 0;
}
+ private int exchangeTablePartition(Hive db,
+ AlterTableExchangePartition exchangePartition) throws HiveException {
+ Map<String, String> partitionSpecs = exchangePartition.getPartitionSpecs();
+ Table destTable = exchangePartition.getDestinationTable();
+ Table sourceTable = exchangePartition.getSourceTable();
+ db.exchangeTablePartitions(partitionSpecs, sourceTable.getDbName(),
+ sourceTable.getTableName(),destTable.getDbName(),
+ destTable.getTableName());
+ return 0;
+ }
+
private List<Path> getLocations(Hive db, Table table, Map<String, String> partSpec)
throws HiveException {
List<Path> locations = new ArrayList<Path>();
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Fri Apr 26 04:59:50 2013
@@ -412,7 +412,7 @@ public class ExecDriver extends Task<Map
LOG.info("Add 1 archive file to distributed cache. Archive file: " + hdfsFilePath.toUri());
}
}
-
+ work.configureJobConf(job);
addInputPaths(job, work, emptyScratchDirStr, ctx);
Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Fri Apr 26 04:59:50 2013
@@ -50,7 +50,6 @@ public class ExecMapper extends MapReduc
private JobConf jc;
private boolean abort = false;
private Reporter rp;
- private List<OperatorHook> opHooks;
public static final Log l4j = LogFactory.getLog("ExecMapper");
private static boolean done;
@@ -99,7 +98,6 @@ public class ExecMapper extends MapReduc
mo.setExecContext(execContext);
mo.initializeLocalWork(jc);
mo.initialize(jc, null);
- opHooks = OperatorHookUtils.getOperatorHooks(jc);
if (localWork == null) {
return;
@@ -132,7 +130,6 @@ public class ExecMapper extends MapReduc
rp = reporter;
mo.setOutputCollector(oc);
mo.setReporter(rp);
- mo.setOperatorHooks(opHooks);
MapredContext.get().setReporter(reporter);
}
// reset the execContext for each new row
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Fri Apr 26 04:59:50 2013
@@ -66,7 +66,6 @@ public class ExecReducer extends MapRedu
private long nextCntr = 1;
private static String[] fieldNames;
- private List<OperatorHook> opHooks;
public static final Log l4j = LogFactory.getLog("ExecReducer");
private boolean isLogInfoEnabled = false;
@@ -152,7 +151,6 @@ public class ExecReducer extends MapRedu
try {
l4j.info(reducer.dump(0));
reducer.initialize(jc, rowObjectInspector);
- opHooks = OperatorHookUtils.getOperatorHooks(jc);
} catch (Throwable e) {
abort = true;
if (e instanceof OutOfMemoryError) {
@@ -183,7 +181,6 @@ public class ExecReducer extends MapRedu
rp = reporter;
reducer.setOutputCollector(oc);
reducer.setReporter(rp);
- reducer.setOperatorHooks(opHooks);
MapredContext.get().setReporter(reporter);
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Fri Apr 26 04:59:50 2013
@@ -755,7 +755,9 @@ public class FileSinkOperator extends Te
// check # of dp
if (valToPaths.size() > maxPartitions) {
// throw fatal error
- incrCounter(fatalErrorCntr, 1);
+ if (counterNameToEnum != null) {
+ incrCounter(fatalErrorCntr, 1);
+ }
fatalError = true;
LOG.error("Fatal error was thrown due to exceeding number of dynamic partitions");
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Fri Apr 26 04:59:50 2013
@@ -58,12 +58,12 @@ import org.apache.hadoop.hive.serde2.laz
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -90,8 +90,6 @@ public class GroupByOperator extends Ope
protected transient ObjectInspector[][] aggregationParameterObjectInspectors;
protected transient ObjectInspector[][] aggregationParameterStandardObjectInspectors;
protected transient Object[][] aggregationParameterObjects;
- // In the future, we may allow both count(DISTINCT a) and sum(DISTINCT a) in
- // the same SQL clause,
// so aggregationIsDistinct is a boolean array instead of a single number.
protected transient boolean[] aggregationIsDistinct;
// Map from integer tag to distinct aggrs
@@ -887,8 +885,15 @@ public class GroupByOperator extends Ope
// Forward the current keys if needed for sort-based aggregation
if (currentKeys != null && !keysAreEqual) {
- forward(currentKeys.getKeyArray(), aggregations);
- countAfterReport = 0;
+ // This is to optimize queries of the form:
+ // select count(distinct key) from T
+ // where T is sorted and bucketized by key
+ // Partial aggregation is performed on the mapper, and the
+ // reducer gets 1 row (partial result) per mapper.
+ if (!conf.isDontResetAggrsDistinct()) {
+ forward(currentKeys.getKeyArray(), aggregations);
+ countAfterReport = 0;
+ }
}
// Need to update the keys?
@@ -900,7 +905,10 @@ public class GroupByOperator extends Ope
}
// Reset the aggregations
- resetAggregations(aggregations);
+ // For distincts optimization with sorting/bucketing, perform partial aggregation
+ if (!conf.isDontResetAggrsDistinct()) {
+ resetAggregations(aggregations);
+ }
// clear parameters in last-invoke
for (int i = 0; i < aggregationsParametersLastInvoke.length; i++) {
@@ -1076,7 +1084,7 @@ public class GroupByOperator extends Ope
try {
// put the hash related stats in statsMap if applicable, so that they
// are sent to jt as counters
- if (hashAggr) {
+ if (hashAggr && counterNameToEnum != null) {
incrCounter(counterNameHashOut, numRowsHashTbl);
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Fri Apr 26 04:59:50 2013
@@ -54,6 +54,7 @@ public class HashTableSinkOperator exten
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class.getName());
+ protected static MapJoinMetaData metadata = new MapJoinMetaData();
// from abstract map join operator
/**
* The expressions for join inputs's join keys.
@@ -164,6 +165,10 @@ public class HashTableSinkOperator exten
}
+ public static MapJoinMetaData getMetadata() {
+ return metadata;
+ }
+
private static final transient String[] FATAL_ERR_MSG = {
null, // counter value 0 means no error
"Mapside join exceeds available memory. "
@@ -301,8 +306,7 @@ public class HashTableSinkOperator exten
null);
keySerializer.initialize(null, keyTableDesc.getProperties());
- MapJoinMetaData.clear();
- MapJoinMetaData.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
+ metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf));
}
@@ -349,7 +353,8 @@ public class HashTableSinkOperator exten
// Construct externalizable objects for key and value
if (needNewKey) {
- MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
+ MapJoinObjectValue valueObj = new MapJoinObjectValue(
+ metadataValueTag[tag], res);
rowNumber++;
if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
@@ -391,7 +396,7 @@ public class HashTableSinkOperator exten
.getStandardStructObjectInspector(newNames, newFields);
int alias = Integer.valueOf(metadataValueTag[tag]);
- MapJoinMetaData.put(alias, new HashTableSinkObjectCtx(
+ metadata.put(Integer.valueOf(metadataValueTag[tag]), new HashTableSinkObjectCtx(
standardOI, valueSerDe, valueTableDesc, hasFilter(alias), hconf));
}
@@ -435,7 +440,7 @@ public class HashTableSinkOperator exten
super.closeOp(abort);
} catch (Exception e) {
- LOG.error("Generate Hashtable error");
+ LOG.error("Generate Hashtable error", e);
e.printStackTrace();
}
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Fri Apr 26 04:59:50 2013
@@ -269,4 +269,14 @@ public class JoinOperator extends Common
// optimizations for now.
return false;
}
+
+ @Override
+ public boolean opAllowedBeforeSortMergeJoin() {
+ // If a join occurs before the sort-merge join, it is not useful to convert the the sort-merge
+ // join to a mapjoin. It might be simpler to perform the join and then a sort-merge join
+ // join. By converting the sort-merge join to a map-join, the job will be executed in 2
+ // mapjoins in the best case. The number of inputs for the join is more than 1 so it would
+ // be difficult to figure out the big table for the mapjoin.
+ return false;
+ }
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java Fri Apr 26 04:59:50 2013
@@ -24,20 +24,21 @@ import java.util.Map;
import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
public class MapJoinMetaData {
- static transient Map<Integer, HashTableSinkObjectCtx> mapMetadata = new HashMap<Integer, HashTableSinkObjectCtx>();
+ transient Map<Integer, HashTableSinkObjectCtx> mapMetadata =
+ new HashMap<Integer, HashTableSinkObjectCtx>();
static ArrayList<Object> list = new ArrayList<Object>();
public MapJoinMetaData(){
}
- public static void put(Integer key, HashTableSinkObjectCtx value){
+ public void put(Integer key, HashTableSinkObjectCtx value){
mapMetadata.put(key, value);
}
- public static HashTableSinkObjectCtx get(Integer key){
+ public HashTableSinkObjectCtx get(Integer key){
return mapMetadata.get(key);
}
- public static void clear(){
+ public void clear(){
mapMetadata.clear();
}
@@ -45,5 +46,4 @@ public class MapJoinMetaData {
list.clear();
return list;
}
-
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Apr 26 04:59:50 2013
@@ -54,6 +54,11 @@ public class MapJoinOperator extends Abs
protected transient HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[] mapJoinTables;
+ protected static MapJoinMetaData metadata = new MapJoinMetaData();
+ public static MapJoinMetaData getMetadata() {
+ return metadata;
+ }
+
private static final transient String[] FATAL_ERR_MSG = {
null, // counter value 0 means no error
"Mapside join exceeds available memory. "
@@ -117,7 +122,7 @@ public class MapJoinOperator extends Abs
SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
null);
keySerializer.initialize(null, keyTableDesc.getProperties());
- MapJoinMetaData.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
+ metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf));
@@ -136,7 +141,7 @@ public class MapJoinOperator extends Abs
valueSerDe.initialize(null, valueTableDesc.getProperties());
ObjectInspector inspector = valueSerDe.getObjectInspector();
- MapJoinMetaData.put(Integer.valueOf(pos), new HashTableSinkObjectCtx(ObjectInspectorUtils
+ metadata.put(Integer.valueOf(pos), new HashTableSinkObjectCtx(ObjectInspectorUtils
.getStandardObjectInspector(inspector, ObjectInspectorCopyOption.WRITABLE),
valueSerDe, valueTableDesc, hasFilter(pos), hconf));
}
@@ -189,8 +194,8 @@ public class MapJoinOperator extends Abs
hashtable.initilizePersistentHash(path.toUri().getPath());
}
} catch (Exception e) {
- LOG.error("Load Distributed Cache Error");
- throw new HiveException(e.getMessage());
+ LOG.error("Load Distributed Cache Error", e);
+ throw new HiveException(e);
}
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Fri Apr 26 04:59:50 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.ql.Context;
@@ -46,7 +47,6 @@ import org.apache.hadoop.hive.ql.io.rcfi
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -394,7 +394,8 @@ public class MoveTask extends Task<MoveW
}
dc = null; // reset data container to prevent it being added again.
} else { // static partitions
- List<String> partVals = Hive.getPvals(table.getPartCols(), tbd.getPartitionSpec());
+ List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),
+ tbd.getPartitionSpec());
db.validatePartitionNameCharacters(partVals);
db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
tbd.getPartitionSpec(), tbd.getReplace(), tbd.getHoldDDLTime(),
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Apr 26 04:59:50 2013
@@ -55,7 +55,6 @@ public abstract class Operator<T extends
// Bean methods
private static final long serialVersionUID = 1L;
- List<OperatorHook> operatorHooks;
private Configuration configuration;
protected List<Operator<? extends OperatorDesc>> childOperators;
@@ -241,17 +240,6 @@ public abstract class Operator<T extends
return id;
}
- public void setOperatorHooks(List<OperatorHook> opHooks){
- operatorHooks = opHooks;
- if (childOperators == null) {
- return;
- }
-
- for (Operator<? extends OperatorDesc> op : childOperators) {
- op.setOperatorHooks(opHooks);
- }
- }
-
public void setReporter(Reporter rep) {
reporter = rep;
@@ -436,34 +424,6 @@ public abstract class Operator<T extends
}
}
- private void enterOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
- if (this.operatorHooks == null) {
- return;
- }
- for(OperatorHook opHook : this.operatorHooks) {
- opHook.enter(opHookContext);
- }
- }
-
- private void exitOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
- if (this.operatorHooks == null) {
- return;
- }
- for(OperatorHook opHook : this.operatorHooks) {
- opHook.exit(opHookContext);
- }
- }
-
- private void closeOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
- if (this.operatorHooks == null) {
- return;
- }
- for(OperatorHook opHook : this.operatorHooks) {
- opHook.close(opHookContext);
- }
- }
-
-
/**
* Collects all the parent's output object inspectors and calls actual
* initialization method.
@@ -525,12 +485,22 @@ public abstract class Operator<T extends
if (fatalError) {
return;
}
- OperatorHookContext opHookContext = new OperatorHookContext(this, row, tag);
- preProcessCounter();
- enterOperatorHooks(opHookContext);
- processOp(row, tag);
- exitOperatorHooks(opHookContext);
- postProcessCounter();
+
+ if (counterNameToEnum != null) {
+ inputRows++;
+ if ((inputRows % 1000) == 0) {
+ incrCounter(numInputRowsCntr, inputRows);
+ incrCounter(timeTakenCntr, totalTime);
+ inputRows = 0;
+ totalTime = 0;
+ }
+
+ beginTime = System.currentTimeMillis();
+ processOp(row, tag);
+ totalTime += (System.currentTimeMillis() - beginTime);
+ } else {
+ processOp(row, tag);
+ }
}
// If a operator wants to do some work at the beginning of a group
@@ -606,13 +576,14 @@ public abstract class Operator<T extends
state = State.CLOSE;
LOG.info(id + " finished. closing... ");
- incrCounter(numInputRowsCntr, inputRows);
- incrCounter(numOutputRowsCntr, outputRows);
- incrCounter(timeTakenCntr, totalTime);
+ if (counterNameToEnum != null) {
+ incrCounter(numInputRowsCntr, inputRows);
+ incrCounter(numOutputRowsCntr, outputRows);
+ incrCounter(timeTakenCntr, totalTime);
+ }
LOG.info(id + " forwarded " + cntr + " rows");
- closeOperatorHooks(new OperatorHookContext(this));
// call the operator specific close routine
closeOp(abort);
@@ -822,9 +793,11 @@ public abstract class Operator<T extends
protected void forward(Object row, ObjectInspector rowInspector)
throws HiveException {
- if ((++outputRows % 1000) == 0) {
- incrCounter(numOutputRowsCntr, outputRows);
- outputRows = 0;
+ if (counterNameToEnum != null) {
+ if ((++outputRows % 1000) == 0) {
+ incrCounter(numOutputRowsCntr, outputRows);
+ outputRows = 0;
+ }
}
if (isLogInfoEnabled) {
@@ -1158,39 +1131,12 @@ public abstract class Operator<T extends
protected transient Object groupKeyObject;
/**
- * this is called before operator process to buffer some counters.
- */
- private void preProcessCounter() {
- inputRows++;
- if ((inputRows % 1000) == 0) {
- incrCounter(numInputRowsCntr, inputRows);
- incrCounter(timeTakenCntr, totalTime);
- inputRows = 0;
- totalTime = 0;
- }
- beginTime = System.currentTimeMillis();
- }
-
- /**
- * this is called after operator process to buffer some counters.
- */
- private void postProcessCounter() {
- if (counterNameToEnum != null) {
- totalTime += (System.currentTimeMillis() - beginTime);
- }
- }
-
- /**
* this is called in operators in map or reduce tasks.
*
* @param name
* @param amount
*/
protected void incrCounter(String name, long amount) {
- if(counterNameToEnum == null) {
- return;
- }
-
String counterName = getWrappedCounterName(name);
ProgressCounter pc = counterNameToEnum.get(counterName);
@@ -1525,6 +1471,15 @@ public abstract class Operator<T extends
return true;
}
+ /*
+ * If this task contains a sortmergejoin, it can be converted to a map-join task if this operator
+ * is present in the mapper. For eg. if a sort-merge join operator is present followed by a
+ * regular join, it cannot be converted to a auto map-join.
+ */
+ public boolean opAllowedBeforeSortMergeJoin() {
+ return true;
+ }
+
public String toString() {
return getName() + "[" + getIdentifier() + "]";
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Fri Apr 26 04:59:50 2013
@@ -72,14 +72,7 @@ public class PTFOperator extends Operato
hiveConf = new HiveConf(jobConf, PTFOperator.class);
// if the parent is ExtractOperator, this invocation is from reduce-side
Operator<? extends OperatorDesc> parentOp = getParentOperators().get(0);
- if (parentOp instanceof ExtractOperator)
- {
- isMapOperator = false;
- }
- else
- {
- isMapOperator = true;
- }
+ isMapOperator = conf.isMapSide();
reconstructQueryDef(hiveConf);
inputPart = createFirstPartitionForChain(
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java Fri Apr 26 04:59:50 2013
@@ -31,6 +31,7 @@ import java.beans.XMLDecoder;
import java.beans.XMLEncoder;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -261,25 +262,26 @@ public class PTFUtils {
}
}
- public static void makeTransient(Class<?> beanClass, String pdName)
- {
- BeanInfo info;
- try
- {
- info = Introspector.getBeanInfo(beanClass);
- PropertyDescriptor[] propertyDescriptors = info
- .getPropertyDescriptors();
- for (int i = 0; i < propertyDescriptors.length; ++i)
- {
- PropertyDescriptor pd = propertyDescriptors[i];
- if (pd.getName().equals(pdName))
- {
- pd.setValue("transient", Boolean.TRUE);
+ public static void makeTransient(Class<?> beanClass, String... pdNames) {
+ try {
+ BeanInfo info = Introspector.getBeanInfo(beanClass);
+ PropertyDescriptor[] descs = info.getPropertyDescriptors();
+ if (descs == null) {
+ throw new RuntimeException("Cannot access property descriptor for class " + beanClass);
+ }
+ Map<String, PropertyDescriptor> mapping = new HashMap<String, PropertyDescriptor>();
+ for (PropertyDescriptor desc : descs) {
+ mapping.put(desc.getName(), desc);
+ }
+ for (String pdName : pdNames) {
+ PropertyDescriptor desc = mapping.get(pdName);
+ if (desc == null) {
+ throw new RuntimeException("Property " + pdName + " does not exist in " + beanClass);
}
+ desc.setValue("transient", Boolean.TRUE);
}
}
- catch (IntrospectionException ie)
- {
+ catch (IntrospectionException ie) {
throw new RuntimeException(ie);
}
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Fri Apr 26 04:59:50 2013
@@ -80,6 +80,15 @@ public class ReduceSinkOperator extends
transient byte[] tagByte = new byte[1];
transient protected int numDistributionKeys;
transient protected int numDistinctExprs;
+ transient String inputAlias; // input alias of this RS for join (used for PPD)
+
+ public void setInputAlias(String inputAlias) {
+ this.inputAlias = inputAlias;
+ }
+
+ public String getInputAlias() {
+ return inputAlias;
+ }
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Fri Apr 26 04:59:50 2013
@@ -80,6 +80,7 @@ public abstract class Task<T extends Ser
// hive.auto.convert.join.noconditionaltask is set to true. No conditional task was
// created in case the mapjoin failed.
public static final int MAPJOIN_ONLY_NOBACKUP = 6;
+ public static final int CONVERTED_SORTMERGEJOIN = 7;
// Descendants tasks who subscribe feeds from this task
protected transient List<Task<? extends Serializable>> feedSubscribers;
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Fri Apr 26 04:59:50 2013
@@ -163,4 +163,12 @@ public class UnionOperator extends Opera
public boolean opAllowedAfterMapJoin() {
return false;
}
+
+ @Override
+ public boolean opAllowedBeforeSortMergeJoin() {
+ // If a union occurs before the sort-merge join, it is not useful to convert the the
+ // sort-merge join to a mapjoin. The number of inputs for the union is more than 1 so
+ // it would be difficult to figure out the big table for the mapjoin.
+ return false;
+ }
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Apr 26 04:59:50 2013
@@ -380,7 +380,7 @@ public final class Utilities {
public static String getHiveJobID(Configuration job) {
String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
- if (planPath != null) {
+ if (planPath != null && !planPath.isEmpty()) {
return (new Path(planPath)).getName();
}
return null;
@@ -2415,8 +2415,5 @@ public final class Utilities {
return sb.toString();
}
-
- public static Class getBuiltinUtilsClass() throws ClassNotFoundException {
- return Class.forName("org.apache.hive.builtins.BuiltinUtils");
- }
}
+
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Fri Apr 26 04:59:50 2013
@@ -158,8 +158,6 @@ public class HashMapWrapper<K, V> implem
}
public boolean isAbort(long numRows,LogHelper console) {
- System.gc();
- System.gc();
int size = mHash.size();
long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
double rate = (double) usedMemory / (double) maxMemory;
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java Fri Apr 26 04:59:50 2013
@@ -23,8 +23,10 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
-import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -93,7 +95,7 @@ public class MapJoinDoubleKeys extends A
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
try {
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+ HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get(Integer.valueOf(metadataTag));
Writable val = ctx.getSerDe().getSerializedClass().newInstance();
val.readFields(in);
@@ -124,7 +126,8 @@ public class MapJoinDoubleKeys extends A
try {
// out.writeInt(metadataTag);
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+ HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get(
+ Integer.valueOf(metadataTag));
ArrayList<Object> list = MapJoinMetaData.getList();
list.add(obj1);
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java Fri Apr 26 04:59:50 2013
@@ -23,8 +23,9 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
-import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -95,7 +96,7 @@ public class MapJoinObjectKey extends A
ClassNotFoundException {
try {
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(
+ HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get(
Integer.valueOf(metadataTag));
Writable val = ctx.getSerDe().getSerializedClass().newInstance();
@@ -119,7 +120,7 @@ public class MapJoinObjectKey extends A
public void writeExternal(ObjectOutput out) throws IOException {
try {
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(
+ HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get(
Integer.valueOf(metadataTag));
// Different processing for key and value
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java Fri Apr 26 04:59:50 2013
@@ -24,8 +24,10 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
-import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -89,7 +91,8 @@ public class MapJoinObjectValue implemen
metadataTag = in.readInt();
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+ HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get(
+ Integer.valueOf(metadataTag));
int sz = in.readInt();
MapJoinRowContainer<Object[]> res = new MapJoinRowContainer<Object[]>();
if (sz > 0) {
@@ -132,7 +135,8 @@ public class MapJoinObjectValue implemen
out.writeInt(metadataTag);
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+ HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get(
+ Integer.valueOf(metadataTag));
// Different processing for key and value
MapJoinRowContainer<Object[]> v = obj;
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java Fri Apr 26 04:59:50 2013
@@ -23,8 +23,10 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
-import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -74,10 +76,12 @@ public class MapJoinSingleKey extends Ab
}
@Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ public void readExternal(ObjectInput in)
+ throws IOException, ClassNotFoundException {
try {
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+ HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get(
+ Integer.valueOf(metadataTag));
Writable val = ctx.getSerDe().getSerializedClass().newInstance();
val.readFields(in);
@@ -106,7 +110,8 @@ public class MapJoinSingleKey extends Ab
try {
// out.writeInt(metadataTag);
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+ HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get(
+ Integer.valueOf(metadataTag));
ArrayList<Object> list = MapJoinMetaData.getList();
list.add(obj);
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Fri Apr 26 04:59:50 2013
@@ -243,6 +243,12 @@ public class RCFile {
this.numberRows = numberRows;
}
+ public void nullColumn(int columnIndex) {
+ eachColumnValueLen[columnIndex] = 0;
+ eachColumnUncompressedValueLen[columnIndex] = 0;
+ allCellValLenBuffer[columnIndex] = new NonSyncDataOutputBuffer();
+ }
+
/**
* add in a new column's meta data.
*
@@ -553,6 +559,14 @@ public class RCFile {
}
}
+ public void nullColumn(int columnIndex) {
+ if (codec != null) {
+ compressedColumnsValueBuffer[columnIndex].reset();
+ } else {
+ loadedColumnsValueBuffer[columnIndex].reset();
+ }
+ }
+
public void clearColumnBuffer() throws IOException {
decompressBuffer.reset();
}
@@ -1077,6 +1091,7 @@ public class RCFile {
public int rowReadIndex;
public int runLength;
public int prvLength;
+ public boolean isNulled;
}
private final Path file;
private final FSDataInputStream in;
@@ -1491,6 +1506,7 @@ public class RCFile {
col.rowReadIndex = 0;
col.runLength = 0;
col.prvLength = -1;
+ col.isNulled = colValLenBufferReadIn[selIx].getLength() == 0;
}
return currentKeyLength;
@@ -1694,18 +1710,22 @@ public class RCFile {
SelectedColumn col = selectedColumns[j];
int i = col.colIndex;
- BytesRefWritable ref = ret.unCheckedGet(i);
+ if (col.isNulled) {
+ ret.set(i, null);
+ } else {
+ BytesRefWritable ref = ret.unCheckedGet(i);
- colAdvanceRow(j, col);
+ colAdvanceRow(j, col);
- if (currentValue.decompressedFlag[j]) {
- ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
- col.rowReadIndex, col.prvLength);
- } else {
- ref.set(currentValue.lazyDecompressCallbackObjs[j],
- col.rowReadIndex, col.prvLength);
+ if (currentValue.decompressedFlag[j]) {
+ ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
+ col.rowReadIndex, col.prvLength);
+ } else {
+ ref.set(currentValue.lazyDecompressCallbackObjs[j],
+ col.rowReadIndex, col.prvLength);
+ }
+ col.rowReadIndex += col.prvLength;
}
- col.rowReadIndex += col.prvLength;
}
} else {
// This version of the loop eliminates a condition check and branch
@@ -1714,12 +1734,16 @@ public class RCFile {
SelectedColumn col = selectedColumns[j];
int i = col.colIndex;
- BytesRefWritable ref = ret.unCheckedGet(i);
+ if (col.isNulled) {
+ ret.set(i, null);
+ } else {
+ BytesRefWritable ref = ret.unCheckedGet(i);
- colAdvanceRow(j, col);
- ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
- col.rowReadIndex, col.prvLength);
- col.rowReadIndex += col.prvLength;
+ colAdvanceRow(j, col);
+ ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
+ col.rowReadIndex, col.prvLength);
+ col.rowReadIndex += col.prvLength;
+ }
}
}
rowFetched = true;
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Fri Apr 26 04:59:50 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.io.orc;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -70,13 +71,23 @@ public final class OrcFile {
*/
public static Writer createWriter(FileSystem fs,
Path path,
+ Configuration conf,
ObjectInspector inspector,
long stripeSize,
CompressionKind compress,
int bufferSize,
int rowIndexStride) throws IOException {
return new WriterImpl(fs, path, inspector, stripeSize, compress,
- bufferSize, rowIndexStride);
+ bufferSize, rowIndexStride, getMemoryManager(conf));
}
+ private static MemoryManager memoryManager = null;
+
+ private static synchronized
+ MemoryManager getMemoryManager(Configuration conf) {
+ if (memoryManager == null) {
+ memoryManager = new MemoryManager(conf);
+ }
+ return memoryManager;
+ }
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Fri Apr 26 04:59:50 2013
@@ -71,8 +71,8 @@ public class OrcOutputFormat extends Fil
public void write(NullWritable nullWritable,
OrcSerdeRow row) throws IOException {
if (writer == null) {
- writer = OrcFile.createWriter(fs, path, row.getInspector(), stripeSize,
- compress, compressionSize, rowIndexStride);
+ writer = OrcFile.createWriter(fs, path, this.conf, row.getInspector(),
+ stripeSize, compress, compressionSize, rowIndexStride);
}
writer.addRow(row.getRow());
}
@@ -81,8 +81,9 @@ public class OrcOutputFormat extends Fil
public void write(Writable row) throws IOException {
OrcSerdeRow serdeRow = (OrcSerdeRow) row;
if (writer == null) {
- writer = OrcFile.createWriter(fs, path, serdeRow.getInspector(),
- stripeSize, compress, compressionSize, rowIndexStride);
+ writer = OrcFile.createWriter(fs, path, this.conf,
+ serdeRow.getInspector(), stripeSize, compress, compressionSize,
+ rowIndexStride);
}
writer.addRow(serdeRow.getRow());
}
@@ -101,8 +102,8 @@ public class OrcOutputFormat extends Fil
ObjectInspector inspector = ObjectInspectorFactory.
getStandardStructObjectInspector(new ArrayList<String>(),
new ArrayList<ObjectInspector>());
- writer = OrcFile.createWriter(fs, path, inspector, stripeSize,
- compress, compressionSize, rowIndexStride);
+ writer = OrcFile.createWriter(fs, path, this.conf, inspector,
+ stripeSize, compress, compressionSize, rowIndexStride);
}
writer.close();
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java Fri Apr 26 04:59:50 2013
@@ -17,6 +17,13 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -31,16 +38,9 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.io.Writable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
final class OrcStruct implements Writable {
- private final Object[] fields;
+ private Object[] fields;
OrcStruct(int children) {
fields = new Object[children];
@@ -54,6 +54,14 @@ final class OrcStruct implements Writabl
fields[fieldIndex] = value;
}
+ public int getNumFields() {
+ return fields.length;
+ }
+
+ public void setNumFields(int numFields) {
+ fields = new Object[numFields];
+ }
+
@Override
public void write(DataOutput dataOutput) throws IOException {
throw new UnsupportedOperationException("write unsupported");
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Fri Apr 26 04:59:50 2013
@@ -733,8 +733,9 @@ class RecordReaderImpl implements Record
} else {
length = dictionaryBuffer.size() - offset;
}
- // If the column is just empty strings, the size will be zero, so the buffer will be null,
- // in that case just return result as it will default to empty
+ // If the column is just empty strings, the size will be zero,
+ // so the buffer will be null, in that case just return result
+ // as it will default to empty
if (dictionaryBuffer != null) {
dictionaryBuffer.setText(result, offset, length);
} else {
@@ -788,6 +789,13 @@ class RecordReaderImpl implements Record
result = new OrcStruct(fields.length);
} else {
result = (OrcStruct) previous;
+
+ // If the input format was initialized with a file with a
+ // different number of fields, the number of fields needs to
+ // be updated to the correct number
+ if (result.getNumFields() != fields.length) {
+ result.setNumFields(fields.length);
+ }
}
for(int i=0; i < fields.length; ++i) {
if (fields[i] != null) {
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Fri Apr 26 04:59:50 2013
@@ -18,8 +18,15 @@
package org.apache.hadoop.hive.ql.io.orc;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,14 +51,8 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
import org.apache.hadoop.io.BytesWritable;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
/**
* An ORC file writer. The file is divided into stripes, which is the natural
@@ -62,7 +63,7 @@ import java.util.TreeMap;
* sub-types. Each of the TreeWriters writes the column's data as a set of
* streams.
*/
-class WriterImpl implements Writer {
+class WriterImpl implements Writer, MemoryManager.Callback {
private static final int HDFS_BUFFER_SIZE = 256 * 1024;
private static final int MIN_ROW_INDEX_STRIDE = 1000;
@@ -97,6 +98,7 @@ class WriterImpl implements Writer {
private final OrcProto.RowIndex.Builder rowIndex =
OrcProto.RowIndex.newBuilder();
private final boolean buildIndex;
+ private final MemoryManager memoryManager;
WriterImpl(FileSystem fs,
Path path,
@@ -104,13 +106,15 @@ class WriterImpl implements Writer {
long stripeSize,
CompressionKind compress,
int bufferSize,
- int rowIndexStride) throws IOException {
+ int rowIndexStride,
+ MemoryManager memoryManager) throws IOException {
this.fs = fs;
this.path = path;
this.stripeSize = stripeSize;
this.compress = compress;
this.bufferSize = bufferSize;
this.rowIndexStride = rowIndexStride;
+ this.memoryManager = memoryManager;
buildIndex = rowIndexStride > 0;
codec = createCodec(compress);
treeWriter = createTreeWriter(inspector, streamFactory, false);
@@ -118,6 +122,8 @@ class WriterImpl implements Writer {
throw new IllegalArgumentException("Row stride must be at least " +
MIN_ROW_INDEX_STRIDE);
}
+ // ensure that we are able to handle callbacks before we register ourselves
+ memoryManager.addWriter(path, stripeSize, this);
}
static CompressionCodec createCodec(CompressionKind kind) {
@@ -147,6 +153,13 @@ class WriterImpl implements Writer {
}
}
+ @Override
+ public void checkMemory(double newScale) throws IOException {
+ if (estimateStripeSize() > Math.round(stripeSize * newScale)) {
+ flushStripe();
+ }
+ }
+
/**
* This class is used to hold the contents of streams as they are buffered.
* The TreeWriters write to the outStream and the codec compresses the
@@ -734,19 +747,8 @@ class WriterImpl implements Writer {
int length = rows.size();
int rowIndexEntry = 0;
OrcProto.RowIndex.Builder rowIndex = getRowIndex();
- // need to build the first index entry out here, to handle the case of
- // not having any values.
- if (buildIndex) {
- while (0 == rowIndexValueCount.get(rowIndexEntry) &&
- rowIndexEntry < savedRowIndex.size()) {
- OrcProto.RowIndexEntry.Builder base =
- savedRowIndex.get(rowIndexEntry++).toBuilder();
- rowOutput.getPosition(new RowIndexPositionRecorder(base));
- rowIndex.addEntry(base.build());
- }
- }
// write the values translated into the dump order.
- for(int i = 0; i < length; ++i) {
+ for(int i = 0; i <= length; ++i) {
// now that we are writing out the row values, we can finalize the
// row index
if (buildIndex) {
@@ -758,7 +760,9 @@ class WriterImpl implements Writer {
rowIndex.addEntry(base.build());
}
}
- rowOutput.write(dumpOrder[rows.get(i)]);
+ if (i != length) {
+ rowOutput.write(dumpOrder[rows.get(i)]);
+ }
}
// we need to build the rowindex before calling super, since it
// writes it out.
@@ -1453,8 +1457,8 @@ class WriterImpl implements Writer {
}
}
// once every 1000 rows, check the size to see if we should spill
- if (rowsInStripe % 1000 == 0 && estimateStripeSize() > stripeSize) {
- flushStripe();
+ if (rowsInStripe % 1000 == 0) {
+ checkMemory(memoryManager.getAllocationScale());
}
}
@@ -1464,5 +1468,6 @@ class WriterImpl implements Writer {
int footerLength = writeFooter(rawWriter.getPos());
rawWriter.writeByte(writePostScript(footerLength));
rawWriter.close();
+ memoryManager.removeWriter(path);
}
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java Fri Apr 26 04:59:50 2013
@@ -38,7 +38,7 @@ public class RCFileKeyBufferWrapper impl
protected CompressionCodec codec;
- protected RCFileKeyBufferWrapper() {
+ public RCFileKeyBufferWrapper() {
}
public static RCFileKeyBufferWrapper create(KeyBuffer currentKeyBufferObj) {
@@ -66,4 +66,48 @@ public class RCFileKeyBufferWrapper impl
return keyBuffer;
}
+ public void setKeyBuffer(KeyBuffer keyBuffer) {
+ this.keyBuffer = keyBuffer;
+ }
+
+ public int getRecordLength() {
+ return recordLength;
+ }
+
+ public void setRecordLength(int recordLength) {
+ this.recordLength = recordLength;
+ }
+
+ public int getKeyLength() {
+ return keyLength;
+ }
+
+ public void setKeyLength(int keyLength) {
+ this.keyLength = keyLength;
+ }
+
+ public int getCompressedKeyLength() {
+ return compressedKeyLength;
+ }
+
+ public void setCompressedKeyLength(int compressedKeyLength) {
+ this.compressedKeyLength = compressedKeyLength;
+ }
+
+ public Path getInputPath() {
+ return inputPath;
+ }
+
+ public void setInputPath(Path inputPath) {
+ this.inputPath = inputPath;
+ }
+
+ public CompressionCodec getCodec() {
+ return codec;
+ }
+
+ public void setCodec(CompressionCodec codec) {
+ this.codec = codec;
+ }
+
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java Fri Apr 26 04:59:50 2013
@@ -48,4 +48,12 @@ public class RCFileValueBufferWrapper im
return this.valueBuffer.compareTo(o.valueBuffer);
}
+ public ValueBuffer getValueBuffer() {
+ return valueBuffer;
+ }
+
+ public void setValueBuffer(ValueBuffer valueBuffer) {
+ this.valueBuffer = valueBuffer;
+ }
+
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java Fri Apr 26 04:59:50 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.laz
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -87,6 +88,11 @@ public class DefaultStorageHandler imple
}
@Override
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+ //do nothing by default
+ }
+
+ @Override
public Configuration getConf() {
return conf;
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Fri Apr 26 04:59:50 2013
@@ -1671,7 +1671,7 @@ private void constructOneLBLocationMap(F
List<String> names = null;
Table t = getTable(dbName, tblName);
- List<String> pvals = getPvals(t.getPartCols(), partSpec);
+ List<String> pvals = MetaStoreUtils.getPvals(t.getPartCols(), partSpec);
try {
names = getMSC().listPartitionNames(dbName, tblName, pvals, max);
@@ -1713,19 +1713,6 @@ private void constructOneLBLocationMap(F
}
}
- public static List<String> getPvals(List<FieldSchema> partCols,
- Map<String, String> partSpec) {
- List<String> pvals = new ArrayList<String>();
- for (FieldSchema field : partCols) {
- String val = partSpec.get(field.getName());
- if (val == null) {
- val = "";
- }
- pvals.add(val);
- }
- return pvals;
- }
-
/**
* get all the partitions of the table that matches the given partial
* specification. partition columns whose value is can be anything should be
@@ -1745,7 +1732,7 @@ private void constructOneLBLocationMap(F
"partitioned table");
}
- List<String> partialPvals = getPvals(tbl.getPartCols(), partialPartSpec);
+ List<String> partialPvals = MetaStoreUtils.getPvals(tbl.getPartCols(), partialPartSpec);
List<org.apache.hadoop.hive.metastore.api.Partition> partitions = null;
try {
@@ -2251,6 +2238,18 @@ private void constructOneLBLocationMap(F
}
}
+ public void exchangeTablePartitions(Map<String, String> partitionSpecs,
+ String sourceDb, String sourceTable, String destDb,
+ String destinationTableName) throws HiveException {
+ try {
+ getMSC().exchange_partition(partitionSpecs, sourceDb, sourceTable, destDb,
+ destinationTableName);
+ } catch (Exception ex) {
+ LOG.error(StringUtils.stringifyException(ex));
+ throw new HiveException(ex);
+ }
+ }
+
/**
* Creates a metastore client. Currently it creates only JDBC based client as
* File based store support is removed
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java Fri Apr 26 04:59:50 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
/**
@@ -133,4 +134,12 @@ public interface HiveStorageHandler exte
public void configureTableJobProperties(
TableDesc tableDesc,
Map<String, String> jobProperties);
+
+ /**
+ * Called just before submitting MapReduce job.
+ *
+ * @param tableDesc descriptor for the table being accessed
+ * @param JobConf jobConf for MapReduce job
+ */
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf);
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Fri Apr 26 04:59:50 2013
@@ -472,6 +472,10 @@ abstract public class AbstractSMBJoinPro
(BigTableSelectorForAutoSMJ) ReflectionUtils.newInstance(bigTableMatcherClass, null);
int bigTablePosition =
bigTableMatcher.getBigTablePosition(pGraphContext, joinOp);
+ if (bigTablePosition < 0) {
+ // contains aliases from sub-query
+ return false;
+ }
context.setBigTablePosition(bigTablePosition);
String joinAlias =
bigTablePosition == 0 ?
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java Fri Apr 26 04:59:50 2013
@@ -57,6 +57,9 @@ public class AvgPartitionSizeBasedBigTab
getListTopOps(joinOp, topOps);
int currentPos = 0;
for (TableScanOperator topOp : topOps) {
+ if (topOp == null) {
+ return -1;
+ }
int numPartitions = 1; // in case the sizes match, preference is
// given to the table with fewer partitions
Table table = parseCtx.getTopToTable().get(topOp);
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Fri Apr 26 04:59:50 2013
@@ -249,6 +249,7 @@ public class BucketingSortingReduceSinkO
fsOp.getConf().setMultiFileSpray(false);
fsOp.getConf().setTotalFiles(1);
fsOp.getConf().setNumFiles(1);
+ fsOp.getConf().setRemovedReduceSinkBucketSort(true);
tsOp.setUseBucketizedHiveInputFormat(true);
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Fri Apr 26 04:59:50 2013
@@ -259,6 +259,9 @@ public class GenMRUnion1 implements Node
// Copy into the current union task plan if
if (uPrsCtx.getMapOnlySubq(pos) && uPrsCtx.getRootTask(pos)) {
processSubQueryUnionMerge(ctx, uCtxTask, union, stack);
+ if (ctx.getRootTasks().contains(currTask)) {
+ ctx.getRootTasks().remove(currTask);
+ }
}
// If it a map-reduce job, create a temporary file
else {
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Fri Apr 26 04:59:50 2013
@@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.ExecDriver;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -761,6 +763,41 @@ public final class GenMapRedUtils {
}
/**
+ * Set the key and value description for all the tasks rooted at the given
+ * task. Loops over all the tasks recursively.
+ *
+ * @param task
+ */
+ public static void setKeyAndValueDescForTaskTree(Task<? extends Serializable> task) {
+
+ if (task instanceof ConditionalTask) {
+ List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task)
+ .getListTasks();
+ for (Task<? extends Serializable> tsk : listTasks) {
+ setKeyAndValueDescForTaskTree(tsk);
+ }
+ } else if (task instanceof ExecDriver) {
+ MapredWork work = (MapredWork) task.getWork();
+ work.deriveExplainAttributes();
+ HashMap<String, Operator<? extends OperatorDesc>> opMap = work
+ .getAliasToWork();
+ if (opMap != null && !opMap.isEmpty()) {
+ for (Operator<? extends OperatorDesc> op : opMap.values()) {
+ setKeyAndValueDesc(work, op);
+ }
+ }
+ }
+
+ if (task.getChildTasks() == null) {
+ return;
+ }
+
+ for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+ setKeyAndValueDescForTaskTree(childTask);
+ }
+ }
+
+ /**
* create a new plan and return.
*
* @return the new plan
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Fri Apr 26 04:59:50 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
import org.apache.hadoop.util.StringUtils;
/**
@@ -107,7 +109,7 @@ public class GroupByOptimizer implements
GraphWalker ogw = new DefaultGraphWalker(disp);
// Create a list of topop nodes
- ArrayList<Node> topNodes = new ArrayList<Node>();
+ List<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(pctx.getTopOps().values());
ogw.startWalking(topNodes, null);
@@ -174,15 +176,83 @@ public class GroupByOptimizer implements
GroupByOptimizerSortMatch match = checkSortGroupBy(stack, groupByOp);
boolean useMapperSort =
HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT);
+ GroupByDesc groupByOpDesc = groupByOp.getConf();
+
+ boolean removeReduceSink = false;
+ boolean optimizeDistincts = false;
+ boolean setBucketGroup = false;
// Dont remove the operator for distincts
- if (useMapperSort && !groupByOp.getConf().isDistinct() &&
+ if (useMapperSort &&
(match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
- convertGroupByMapSideSortedGroupBy(hiveConf, groupByOp, depth);
+ if (!groupByOpDesc.isDistinct()) {
+ removeReduceSink = true;
+ }
+ else if (!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
+ // Optimize the query: select count(distinct keys) from T, where
+ // T is bucketized and sorted by T
+ // Partial aggregation can be done by the mappers in this scenario
+
+ List<ExprNodeDesc> keys =
+ ((GroupByOperator)
+ (groupByOp.getChildOperators().get(0).getChildOperators().get(0)))
+ .getConf().getKeys();
+ if ((keys == null) || (keys.isEmpty())) {
+ optimizeDistincts = true;
+ }
+ }
}
- else if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) ||
+
+ if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) ||
(match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
- groupByOp.getConf().setBucketGroup(true);
+ setBucketGroup = true;
+ }
+
+ if (removeReduceSink) {
+ convertGroupByMapSideSortedGroupBy(hiveConf, groupByOp, depth);
+ }
+ else if (optimizeDistincts) {
+ // In test mode, dont change the query plan. However, setup a query property
+ pGraphContext.getQueryProperties().setHasMapGroupBy(true);
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT_TESTMODE)) {
+ return;
+ }
+ ReduceSinkOperator reduceSinkOp =
+ (ReduceSinkOperator)groupByOp.getChildOperators().get(0);
+ GroupByDesc childGroupByDesc =
+ ((GroupByOperator)
+ (reduceSinkOp.getChildOperators().get(0))).getConf();
+
+ for (int pos = 0; pos < childGroupByDesc.getAggregators().size(); pos++) {
+ AggregationDesc aggr = childGroupByDesc.getAggregators().get(pos);
+ // Partial aggregation is not done for distincts on the mapper
+ // However, if the data is bucketed/sorted on the distinct key, partial aggregation
+ // can be performed on the mapper.
+ if (aggr.getDistinct()) {
+ ArrayList<ExprNodeDesc> parameters = new ArrayList<ExprNodeDesc>();
+ ExprNodeDesc param = aggr.getParameters().get(0);
+ assert param instanceof ExprNodeColumnDesc;
+ ExprNodeColumnDesc paramC = (ExprNodeColumnDesc) param;
+ paramC.setIsPartitionColOrVirtualCol(false);
+ paramC.setColumn("VALUE._col" + pos);
+ parameters.add(paramC);
+ aggr.setParameters(parameters);
+ aggr.setDistinct(false);
+ aggr.setMode(Mode.FINAL);
+ }
+ }
+ // Partial aggregation is performed on the mapper, no distinct processing at the reducer
+ childGroupByDesc.setDistinct(false);
+ groupByOpDesc.setDontResetAggrsDistinct(true);
+ groupByOpDesc.setBucketGroup(true);
+ groupByOp.setUseBucketizedHiveInputFormat(true);
+ // no distinct processing at the reducer
+ // A query like 'select count(distinct key) from T' is transformed into
+ // 'select count(key) from T' as far as the reducer is concerned.
+ reduceSinkOp.getConf().setDistinctColumnIndices(new ArrayList<List<Integer>>());
+ }
+ else if (setBucketGroup) {
+ groupByOpDesc.setBucketGroup(true);
}
}
@@ -339,8 +409,8 @@ public class GroupByOptimizer implements
GroupByOptimizerSortMatch currentMatch =
notDeniedPartns.isEmpty() ? GroupByOptimizerSortMatch.NO_MATCH :
- notDeniedPartns.size() > 1 ? GroupByOptimizerSortMatch.PARTIAL_MATCH :
- GroupByOptimizerSortMatch.COMPLETE_MATCH;
+ notDeniedPartns.size() > 1 ? GroupByOptimizerSortMatch.PARTIAL_MATCH :
+ GroupByOptimizerSortMatch.COMPLETE_MATCH;
for (Partition part : notDeniedPartns) {
List<String> sortCols = part.getSortColNames();
List<String> bucketCols = part.getBucketCols();
@@ -440,8 +510,9 @@ public class GroupByOptimizer implements
case NO_MATCH:
return GroupByOptimizerSortMatch.NO_MATCH;
case COMPLETE_MATCH:
- return ((bucketCols != null) && !bucketCols.isEmpty() && sortCols.containsAll(bucketCols)) ?
- GroupByOptimizerSortMatch.COMPLETE_MATCH : GroupByOptimizerSortMatch.PARTIAL_MATCH;
+ return ((bucketCols != null) && !bucketCols.isEmpty() &&
+ sortCols.containsAll(bucketCols)) ?
+ GroupByOptimizerSortMatch.COMPLETE_MATCH : GroupByOptimizerSortMatch.PARTIAL_MATCH;
case PREFIX_COL1_MATCH:
return GroupByOptimizerSortMatch.NO_MATCH;
case PREFIX_COL2_MATCH: