You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/11/17 18:33:11 UTC
svn commit: r1036128 [2/19] - in /hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/o...
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Wed Nov 17 17:33:06 2010
@@ -42,17 +42,20 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.util.ReflectionUtils;
-public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc>
- implements Serializable {
+
+public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc> implements
+ Serializable {
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class
- .getName());
+ private static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class.getName());
// from abstract map join operator
/**
@@ -68,10 +71,8 @@ public class HashTableSinkOperator exten
*/
protected transient Map<Byte, List<ObjectInspector>> joinKeysStandardObjectInspectors;
- protected transient int posBigTableTag = -1; // one of the tables that is not
- // in memory
- protected transient int posBigTableAlias = -1; // one of the tables that is
- // not in memory
+ protected transient int posBigTableTag = -1; // one of the tables that is not in memory
+ protected transient int posBigTableAlias = -1; // one of the tables that is not in memory
transient int mapJoinRowsKey; // rows for a given key
protected transient RowContainer<ArrayList<Object>> emptyList = null;
@@ -114,6 +115,9 @@ public class HashTableSinkOperator exten
private long rowNumber = 0;
protected transient LogHelper console;
+ private long hashTableScale;
+ private boolean isAbort = false;
+
public static class HashTableSinkObjectCtx {
ObjectInspector standardOI;
@@ -125,8 +129,8 @@ public class HashTableSinkOperator exten
* @param standardOI
* @param serde
*/
- public HashTableSinkObjectCtx(ObjectInspector standardOI, SerDe serde,
- TableDesc tblDesc, Configuration conf) {
+ public HashTableSinkObjectCtx(ObjectInspector standardOI, SerDe serde, TableDesc tblDesc,
+ Configuration conf) {
this.standardOI = standardOI;
this.serde = serde;
this.tblDesc = tblDesc;
@@ -157,25 +161,28 @@ public class HashTableSinkOperator exten
}
+ private static final transient String[] FATAL_ERR_MSG = {
+ null, // counter value 0 means no error
+ "Mapside join size exceeds hive.mapjoin.maxsize. "
+ + "Please increase that or remove the mapjoin hint."};
private final int metadataKeyTag = -1;
transient int[] metadataValueTag;
transient int maxMapJoinSize;
+
public HashTableSinkOperator() {
- // super();
- console = new LogHelper(LOG, true);
}
public HashTableSinkOperator(MapJoinOperator mjop) {
this.conf = new HashTableSinkDesc(mjop.getConf());
- console = new LogHelper(LOG);
}
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
-
- maxMapJoinSize = HiveConf.getIntVar(hconf,
- HiveConf.ConfVars.HIVEMAXMAPJOINSIZE);
+ boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT);
+ console = new LogHelper(LOG, isSilent);
+ maxMapJoinSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAXMAPJOINSIZE);
numMapRowsRead = 0;
firstRow = true;
@@ -187,8 +194,7 @@ public class HashTableSinkOperator exten
posBigTableAlias = order[posBigTableTag];
- // initialize some variables, which used to be initialized in
- // CommonJoinOperator
+ // initialize some variables, which used to be initialized in CommonJoinOperator
numAliases = conf.getExprs().size();
this.hconf = hconf;
totalSz = 0;
@@ -197,28 +203,25 @@ public class HashTableSinkOperator exten
// process join keys
joinKeys = new HashMap<Byte, List<ExprNodeEvaluator>>();
- JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), order,
- posBigTableAlias);
- joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(
- joinKeys, inputObjInspectors, posBigTableAlias);
+ JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), order, posBigTableAlias);
+ joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys,
+ inputObjInspectors, posBigTableAlias);
joinKeysStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
joinKeysObjectInspectors, posBigTableAlias);
// process join values
joinValues = new HashMap<Byte, List<ExprNodeEvaluator>>();
- JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), order,
- posBigTableAlias);
- joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(
- joinValues, inputObjInspectors, posBigTableAlias);
+ JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), order, posBigTableAlias);
+ joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues,
+ inputObjInspectors, posBigTableAlias);
joinValuesStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
joinValuesObjectInspectors, posBigTableAlias);
// process join filters
joinFilters = new HashMap<Byte, List<ExprNodeEvaluator>>();
- JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(), order,
- posBigTableAlias);
- joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(
- joinFilters, inputObjInspectors, posBigTableAlias);
+ JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(), order, posBigTableAlias);
+ joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinFilters,
+ inputObjInspectors, posBigTableAlias);
if (noOuterJoin) {
rowContainerStandardObjectInspectors = joinValuesStandardObjectInspectors;
@@ -231,8 +234,7 @@ public class HashTableSinkOperator exten
ArrayList<ObjectInspector> rcOIs = new ArrayList<ObjectInspector>();
rcOIs.addAll(joinValuesObjectInspectors.get(alias));
// for each alias, add object inspector for boolean as the last element
- rcOIs
- .add(PrimitiveObjectInspectorFactory.writableBooleanObjectInspector);
+ rcOIs.add(PrimitiveObjectInspectorFactory.writableBooleanObjectInspector);
rowContainerObjectInspectors.put(alias, rcOIs);
}
rowContainerStandardObjectInspectors = getStandardObjectInspectors(rowContainerObjectInspectors);
@@ -245,52 +247,62 @@ public class HashTableSinkOperator exten
mapJoinTables = new HashMap<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>>();
+ int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD);
+ float hashTableLoadFactor = HiveConf.getFloatVar(hconf,
+ HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
+ float hashTableMaxMemoryUsage = HiveConf.getFloatVar(hconf,
+ HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE);
+ hashTableScale = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVEHASHTABLESCALE);
+ if (hashTableScale <= 0) {
+ hashTableScale = 1;
+ }
+
// initialize the hash tables for other tables
for (Byte pos : order) {
if (pos == posBigTableTag) {
continue;
}
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
+ HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>(
+ hashTableThreshold, hashTableLoadFactor, hashTableMaxMemoryUsage);
+
mapJoinTables.put(pos, hashTable);
}
}
+
+
protected static HashMap<Byte, List<ObjectInspector>> getStandardObjectInspectors(
Map<Byte, List<ObjectInspector>> aliasToObjectInspectors) {
HashMap<Byte, List<ObjectInspector>> result = new HashMap<Byte, List<ObjectInspector>>();
- for (Entry<Byte, List<ObjectInspector>> oiEntry : aliasToObjectInspectors
- .entrySet()) {
+ for (Entry<Byte, List<ObjectInspector>> oiEntry : aliasToObjectInspectors.entrySet()) {
Byte alias = oiEntry.getKey();
List<ObjectInspector> oiList = oiEntry.getValue();
- ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(
- oiList.size());
+ ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(oiList.size());
for (int i = 0; i < oiList.size(); i++) {
- fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList
- .get(i), ObjectInspectorCopyOption.WRITABLE));
+ fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList.get(i),
+ ObjectInspectorCopyOption.WRITABLE));
}
result.put(alias, fieldOIList);
}
return result;
+
}
- public void generateMapMetaData() throws Exception {
+ private void setKeyMetaData() throws SerDeException {
TableDesc keyTableDesc = conf.getKeyTblDesc();
- SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
- .getDeserializerClass(), null);
+ SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
+ null);
keySerializer.initialize(null, keyTableDesc.getProperties());
MapJoinMetaData.clear();
- MapJoinMetaData.put(Integer.valueOf(metadataKeyTag),
- new HashTableSinkObjectCtx(ObjectInspectorUtils
- .getStandardObjectInspector(keySerializer.getObjectInspector(),
- ObjectInspectorCopyOption.WRITABLE), keySerializer,
- keyTableDesc, hconf));
+ MapJoinMetaData.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
+ ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
+ ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, hconf));
}
/*
- * This operator only process small tables Read the key/value pairs Load them
- * into hashtable
+ * This operator only process small tables Read the key/value pairs Load them into hashtable
*/
@Override
public void processOp(Object row, int tag) throws HiveException {
@@ -298,20 +310,20 @@ public class HashTableSinkOperator exten
try {
if (firstRow) {
// generate the map metadata
- generateMapMetaData();
+ setKeyMetaData();
firstRow = false;
}
alias = order[tag];
// alias = (byte)tag;
// compute keys and values as StandardObjects
- AbstractMapJoinKey keyMap = JoinUtil.computeMapJoinKeys(row, joinKeys
- .get(alias), joinKeysObjectInspectors.get(alias));
+ AbstractMapJoinKey keyMap = JoinUtil.computeMapJoinKeys(row, joinKeys.get(alias),
+ joinKeysObjectInspectors.get(alias));
+
+ Object[] value = JoinUtil.computeMapJoinValues(row, joinValues.get(alias),
+ joinValuesObjectInspectors.get(alias), joinFilters.get(alias), joinFilterObjectInspectors
+ .get(alias), noOuterJoin);
- Object[] value = JoinUtil.computeMapJoinValues(row,
- joinValues.get(alias), joinValuesObjectInspectors.get(alias),
- joinFilters.get(alias), joinFilterObjectInspectors.get(alias),
- noOuterJoin);
HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = mapJoinTables
.get((byte) tag);
@@ -326,24 +338,20 @@ public class HashTableSinkOperator exten
if (metadataValueTag[tag] == -1) {
metadataValueTag[tag] = order[tag];
-
- TableDesc valueTableDesc = conf.getValueTblDescs().get(tag);
- SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc
- .getDeserializerClass(), null);
- valueSerDe.initialize(null, valueTableDesc.getProperties());
-
- MapJoinMetaData.put(Integer.valueOf(metadataValueTag[tag]),
- new HashTableSinkObjectCtx(ObjectInspectorUtils
- .getStandardObjectInspector(valueSerDe.getObjectInspector(),
- ObjectInspectorCopyOption.WRITABLE), valueSerDe,
- valueTableDesc, hconf));
+ setValueMetaData(tag);
}
// Construct externalizable objects for key and value
if (needNewKey) {
- MapJoinObjectValue valueObj = new MapJoinObjectValue(
- metadataValueTag[tag], res);
+ MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
+
rowNumber++;
+ if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
+ isAbort = hashTable.isAbort(rowNumber, console);
+ if (isAbort) {
+ throw new HiveException("RunOutOfMeomoryUsage");
+ }
+ }
hashTable.put(keyMap, valueObj);
}
@@ -352,13 +360,34 @@ public class HashTableSinkOperator exten
res.add(value);
}
- } catch (Exception e) {
- e.printStackTrace();
+
+ } catch (SerDeException e) {
throw new HiveException(e);
}
}
+ private void setValueMetaData(int tag) throws SerDeException {
+ TableDesc valueTableDesc = conf.getValueTblFilteredDescs().get(tag);
+ SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(),
+ null);
+
+ valueSerDe.initialize(null, valueTableDesc.getProperties());
+
+ List<ObjectInspector> newFields = rowContainerStandardObjectInspectors.get((Byte) alias);
+ int length = newFields.size();
+ List<String> newNames = new ArrayList<String>(length);
+ for (int i = 0; i < length; i++) {
+ String tmp = new String("tmp_" + i);
+ newNames.add(tmp);
+ }
+ StandardStructObjectInspector standardOI = ObjectInspectorFactory
+ .getStandardStructObjectInspector(newNames, newFields);
+
+ MapJoinMetaData.put(Integer.valueOf(metadataValueTag[tag]), new HashTableSinkObjectCtx(
+ standardOI, valueSerDe, valueTableDesc, hconf));
+ }
+
@Override
public void closeOp(boolean abort) throws HiveException {
try {
@@ -371,31 +400,24 @@ public class HashTableSinkOperator exten
.entrySet()) {
// get the key and value
Byte tag = hashTables.getKey();
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = hashTables
- .getValue();
+ HashMapWrapper hashTable = hashTables.getValue();
// get current input file name
- String bigBucketFileName = this.getExecContext()
- .getCurrentBigBucketFile();
+ String bigBucketFileName = this.getExecContext().getCurrentBigBucketFile();
if (bigBucketFileName == null || bigBucketFileName.length() == 0) {
bigBucketFileName = "-";
}
// get the tmp URI path; it will be a hdfs path if not local mode
- String tmpURIPath = Utilities.generatePath(tmpURI, tag,
- bigBucketFileName);
- console.printInfo(Utilities.now()
- + "\tDump the hashtable into file: " + tmpURIPath);
+ String tmpURIPath = PathUtil.generatePath(tmpURI, tag, bigBucketFileName);
+ hashTable.isAbort(rowNumber, console);
+ console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath);
// get the hashtable file and path
Path path = new Path(tmpURIPath);
FileSystem fs = path.getFileSystem(hconf);
File file = new File(path.toUri().getPath());
fs.create(path);
-
fileLength = hashTable.flushMemoryCacheToPersistent(file);
-
- console.printInfo(Utilities.now() + "\t Processing rows: "
- + rowNumber + "\t key number:" + hashTable.size());
- console.printInfo("Upload 1 File to: " + tmpURIPath + " File size: "
+ console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: "
+ fileLength);
hashTable.close();
@@ -411,7 +433,7 @@ public class HashTableSinkOperator exten
/**
* Implements the getName function for the Node Interface.
- *
+ *
* @return the name of the operator
*/
@Override
@@ -424,4 +446,6 @@ public class HashTableSinkOperator exten
return OperatorType.HASHTABLESINK;
}
+
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Wed Nov 17 17:33:06 2010
@@ -409,4 +409,7 @@ public class MapRedTask extends ExecDriv
return null;
}
+
+
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java Wed Nov 17 17:33:06 2010
@@ -57,7 +57,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
-public class MapredLocalTask extends Task<MapredLocalWork> implements Serializable {
+public class MapredLocalTask extends Task<MapredLocalWork> implements Serializable {
private Map<String, FetchOperator> fetchOperators;
private JobConf job;
@@ -67,11 +67,11 @@ public class MapredLocalTask extends Ta
static final String[] HIVE_SYS_PROP = {"build.dir", "build.dir.hive"};
public static MemoryMXBean memoryMXBean;
- // not sure we need this exec context; but all the operators in the work
+ // not sure we need this exec context; but all the operators in the work
// will pass this context throught
private final ExecMapperContext execContext = new ExecMapperContext();
- public MapredLocalTask(){
+ public MapredLocalTask() {
super();
}
@@ -83,25 +83,23 @@ public class MapredLocalTask extends Ta
}
@Override
- public void initialize(HiveConf conf, QueryPlan queryPlan,
- DriverContext driverContext) {
+ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
super.initialize(conf, queryPlan, driverContext);
job = new JobConf(conf, ExecDriver.class);
}
- public static String now(){
+ public static String now() {
Calendar cal = Calendar.getInstance();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss");
- return sdf.format(cal.getTime());
+ return sdf.format(cal.getTime());
}
@Override
- public int execute(DriverContext driverContext){
- try{
- //generate the cmd line to run in the child jvm
- //String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
+ public int execute(DriverContext driverContext) {
+ try {
+ // generate the cmd line to run in the child jvm
Context ctx = driverContext.getCtx();
String hiveJar = conf.getJar();
@@ -115,16 +113,15 @@ public class MapredLocalTask extends Ta
LOG.info("Generating plan file " + planPath.toString());
Utilities.serializeMapRedLocalWork(plan, out);
- String isSilent = "true".equalsIgnoreCase(System
- .getProperty("test.silent")) ? "-nolog" : "";
+ String isSilent = "true".equalsIgnoreCase(System.getProperty("test.silent")) ? "-nolog" : "";
String jarCmd;
- jarCmd = hiveJar + " " + ExecDriver.class.getName() ;
+ jarCmd = hiveJar + " " + ExecDriver.class.getName();
String hiveConfArgs = ExecDriver.generateCmdLine(conf);
- String cmdLine = hadoopExec + " jar " + jarCmd + " -localtask -plan "
- + planPath.toString() + " " + isSilent + " " + hiveConfArgs;
+ String cmdLine = hadoopExec + " jar " + jarCmd + " -localtask -plan " + planPath.toString()
+ + " " + isSilent + " " + hiveConfArgs;
String workDir = (new File(".")).getCanonicalPath();
String files = ExecDriver.getResourceFiles(conf, SessionState.ResourceType.FILE);
@@ -134,16 +131,16 @@ public class MapredLocalTask extends Ta
workDir = (new Path(ctx.getLocalTmpFileURI())).toUri().getPath();
- if (! (new File(workDir)).mkdir()) {
- throw new IOException ("Cannot create tmp working dir: " + workDir);
+ if (!(new File(workDir)).mkdir()) {
+ throw new IOException("Cannot create tmp working dir: " + workDir);
}
- for (String f: StringUtils.split(files, ',')) {
+ for (String f : StringUtils.split(files, ',')) {
Path p = new Path(f);
String target = p.toUri().getPath();
String link = workDir + Path.SEPARATOR + p.getName();
if (FileUtil.symLink(target, link) != 0) {
- throw new IOException ("Cannot link to added file: " + target + " from: " + link);
+ throw new IOException("Cannot link to added file: " + target + " from: " + link);
}
}
}
@@ -166,31 +163,30 @@ public class MapredLocalTask extends Ta
Map<String, String> variables = new HashMap(System.getenv());
// The user can specify the hadoop memory
- //if ("local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT))) {
- // if we are running in local mode - then the amount of memory used
- // by the child jvm can no longer default to the memory used by the
- // parent jvm
- //int hadoopMem = conf.getIntVar(HiveConf.ConfVars.HIVEHADOOPMAXMEM);
- int hadoopMem= conf.getIntVar(HiveConf.ConfVars.HIVEHADOOPMAXMEM);;
+ // if ("local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT))) {
+ // if we are running in local mode - then the amount of memory used
+ // by the child jvm can no longer default to the memory used by the
+ // parent jvm
+ // int hadoopMem = conf.getIntVar(HiveConf.ConfVars.HIVEHADOOPMAXMEM);
+ int hadoopMem = conf.getIntVar(HiveConf.ConfVars.HIVEHADOOPMAXMEM);
if (hadoopMem == 0) {
// remove env var that would default child jvm to use parent's memory
// as default. child jvm would use default memory for a hadoop client
variables.remove(HADOOP_MEM_KEY);
} else {
// user specified the memory for local mode hadoop run
- console.printInfo(" set heap size\t"+hadoopMem+"MB");
+ console.printInfo(" set heap size\t" + hadoopMem + "MB");
variables.put(HADOOP_MEM_KEY, String.valueOf(hadoopMem));
}
- //} else {
- // nothing to do - we are not running in local mode - only submitting
- // the job via a child process. in this case it's appropriate that the
- // child jvm use the same memory as the parent jvm
+ // } else {
+ // nothing to do - we are not running in local mode - only submitting
+ // the job via a child process. in this case it's appropriate that the
+ // child jvm use the same memory as the parent jvm
- //}
+ // }
if (variables.containsKey(HADOOP_OPTS_KEY)) {
- variables.put(HADOOP_OPTS_KEY, variables.get(HADOOP_OPTS_KEY)
- + hadoopOpts);
+ variables.put(HADOOP_OPTS_KEY, variables.get(HADOOP_OPTS_KEY) + hadoopOpts);
} else {
variables.put(HADOOP_OPTS_KEY, hadoopOpts);
}
@@ -205,10 +201,8 @@ public class MapredLocalTask extends Ta
// Run ExecDriver in another JVM
executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir));
- StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(),
- null, System.out);
- StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(),
- null, System.err);
+ StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
+ StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err);
outPrinter.start();
errPrinter.start();
@@ -217,10 +211,9 @@ public class MapredLocalTask extends Ta
if (exitVal != 0) {
LOG.error("Execution failed with exit status: " + exitVal);
- console.printError("Mapred Local Task Failed. Give up the map join stragery");
} else {
LOG.info("Execution completed successfully");
- console.printInfo("Mapred Local Task Running Successfully . Keep using map join stragery");
+ console.printInfo("Mapred Local Task Succeeded . Convert the Join into MapJoin");
}
return exitVal;
@@ -233,54 +226,56 @@ public class MapredLocalTask extends Ta
- public int executeFromChildJVM(DriverContext driverContext){
-
+ public int executeFromChildJVM(DriverContext driverContext) {
// check the local work
- if(work == null){
+ if (work == null) {
return -1;
}
memoryMXBean = ManagementFactory.getMemoryMXBean();
- console.printInfo(Utilities.now()+"\tStarting to luaunch local task to process map join ");
- console.printInfo("\tmaximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
+ long startTime = System.currentTimeMillis();
+ console.printInfo(Utilities.now()
+ + "\tStarting to luaunch local task to process map join;\tmaximum memory = "
+ + memoryMXBean.getHeapMemoryUsage().getMax());
fetchOperators = new HashMap<String, FetchOperator>();
Map<FetchOperator, JobConf> fetchOpJobConfMap = new HashMap<FetchOperator, JobConf>();
execContext.setJc(job);
- //set the local work, so all the operator can get this context
+ // set the local work, so all the operator can get this context
execContext.setLocalWork(work);
boolean inputFileChangeSenstive = work.getInputFileChangeSensitive();
- try{
+ try {
initializeOperators(fetchOpJobConfMap);
- //for each big table's bucket, call the start forward
- if(inputFileChangeSenstive){
- for( LinkedHashMap<String, ArrayList<String>> bigTableBucketFiles:
- work.getBucketMapjoinContext().getAliasBucketFileNameMapping().values()){
- for(String bigTableBucket: bigTableBucketFiles.keySet()){
- startForward(inputFileChangeSenstive,bigTableBucket);
+ // for each big table's bucket, call the start forward
+ if (inputFileChangeSenstive) {
+ for (LinkedHashMap<String, ArrayList<String>> bigTableBucketFiles : work
+ .getBucketMapjoinContext().getAliasBucketFileNameMapping().values()) {
+ for (String bigTableBucket : bigTableBucketFiles.keySet()) {
+ startForward(inputFileChangeSenstive, bigTableBucket);
}
}
- }else{
- startForward(inputFileChangeSenstive,null);
+ } else {
+ startForward(inputFileChangeSenstive, null);
}
- console.printInfo(now()+"\tEnd of local task ");
+ long currentTime = System.currentTimeMillis();
+ long elapsed = currentTime - startTime;
+ console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: "
+ + Utilities.showTime(elapsed) + " sec.");
} catch (Throwable e) {
- if (e instanceof OutOfMemoryError) {
+ if (e instanceof OutOfMemoryError
+ || (e instanceof HiveException && e.getMessage().equals("RunOutOfMeomoryUsage"))) {
// Don't create a new object if we are already out of memory
- l4j.error("Out of Memory Error");
- console.printError("[Warning] Small table is too large to put into memory");
- return 2;
+ return 3;
} else {
l4j.error("Hive Runtime Error: Map local work failed");
e.printStackTrace();
+ return 2;
}
- }finally{
- console.printInfo(Utilities.now()+"\tFinish running local task");
}
return 0;
}
private void startForward(boolean inputFileChangeSenstive, String bigTableBucket)
- throws Exception{
+ throws Exception {
for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
int fetchOpRows = 0;
String alias = entry.getKey();
@@ -288,17 +283,17 @@ public class MapredLocalTask extends Ta
if (inputFileChangeSenstive) {
fetchOp.clearFetchContext();
- setUpFetchOpContext(fetchOp, alias,bigTableBucket);
+ setUpFetchOpContext(fetchOp, alias, bigTableBucket);
}
- //get the root operator
+ // get the root operator
Operator<? extends Serializable> forwardOp = work.getAliasToWork().get(alias);
- //walk through the operator tree
+ // walk through the operator tree
while (true) {
InspectableObject row = fetchOp.getNextRow();
if (row == null) {
if (inputFileChangeSenstive) {
- String fileName=this.getFileName(bigTableBucket);
+ String fileName = this.getFileName(bigTableBucket);
execContext.setCurrentBigBucketFile(fileName);
forwardOp.reset();
}
@@ -310,22 +305,23 @@ public class MapredLocalTask extends Ta
// check if any operator had a fatal error or early exit during
// execution
if (forwardOp.getDone()) {
- //ExecMapper.setDone(true);
+ // ExecMapper.setDone(true);
break;
}
}
}
}
+
private void initializeOperators(Map<FetchOperator, JobConf> fetchOpJobConfMap)
- throws HiveException{
+ throws HiveException {
// this mapper operator is used to initialize all the operators
for (Map.Entry<String, FetchWork> entry : work.getAliasToFetchWork().entrySet()) {
JobConf jobClone = new JobConf(job);
Operator<? extends Serializable> tableScan = work.getAliasToWork().get(entry.getKey());
boolean setColumnsNeeded = false;
- if(tableScan instanceof TableScanOperator) {
- ArrayList<Integer> list = ((TableScanOperator)tableScan).getNeededColumnIDs();
+ if (tableScan instanceof TableScanOperator) {
+ ArrayList<Integer> list = ((TableScanOperator) tableScan).getNeededColumnIDs();
if (list != null) {
ColumnProjectionUtils.appendReadColumnIDs(jobClone, list);
setColumnsNeeded = true;
@@ -336,18 +332,18 @@ public class MapredLocalTask extends Ta
ColumnProjectionUtils.setFullyReadColumns(jobClone);
}
- //create a fetch operator
- FetchOperator fetchOp = new FetchOperator(entry.getValue(),jobClone);
+ // create a fetch operator
+ FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone);
fetchOpJobConfMap.put(fetchOp, jobClone);
fetchOperators.put(entry.getKey(), fetchOp);
l4j.info("fetchoperator for " + entry.getKey() + " created");
}
- //initilize all forward operator
+ // initilize all forward operator
for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
- //get the forward op
+ // get the forward op
Operator<? extends Serializable> forwardOp = work.getAliasToWork().get(entry.getKey());
- //put the exe context into all the operators
+ // put the exe context into all the operators
forwardOp.setExecContext(execContext);
// All the operators need to be initialized before process
FetchOperator fetchOp = entry.getValue();
@@ -356,54 +352,58 @@ public class MapredLocalTask extends Ta
if (jobConf == null) {
jobConf = job;
}
- //initialize the forward operator
+ // initialize the forward operator
forwardOp.initialize(jobConf, new ObjectInspector[] {fetchOp.getOutputObjectInspector()});
l4j.info("fetchoperator for " + entry.getKey() + " initialized");
}
}
- private void setUpFetchOpContext(FetchOperator fetchOp, String alias,String currentInputFile)
- throws Exception {
+ private void setUpFetchOpContext(FetchOperator fetchOp, String alias, String currentInputFile)
+ throws Exception {
- BucketMapJoinContext bucketMatcherCxt = this.work
- .getBucketMapjoinContext();
+ BucketMapJoinContext bucketMatcherCxt = this.work.getBucketMapjoinContext();
- Class<? extends BucketMatcher> bucketMatcherCls = bucketMatcherCxt
- .getBucketMatcherClass();
- BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance(
- bucketMatcherCls, null);
- bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt
- .getAliasBucketFileNameMapping());
+ Class<? extends BucketMatcher> bucketMatcherCls = bucketMatcherCxt.getBucketMatcherClass();
+ BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance(bucketMatcherCls,
+ null);
+ bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt.getAliasBucketFileNameMapping());
- List<Path> aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile,
- bucketMatcherCxt.getMapJoinBigTableAlias(), alias);
+ List<Path> aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, bucketMatcherCxt
+ .getMapJoinBigTableAlias(), alias);
Iterator<Path> iter = aliasFiles.iterator();
fetchOp.setupContext(iter, null);
}
- private String getFileName(String path){
- if(path== null || path.length()==0) {
+ private String getFileName(String path) {
+ if (path == null || path.length() == 0) {
return null;
}
- int last_separator = path.lastIndexOf(Path.SEPARATOR)+1;
+ int last_separator = path.lastIndexOf(Path.SEPARATOR) + 1;
String fileName = path.substring(last_separator);
return fileName;
}
+
@Override
- public void localizeMRTmpFilesImpl(Context ctx){
+ public void localizeMRTmpFilesImpl(Context ctx) {
}
@Override
+ public boolean isMapRedLocalTask() {
+ return true;
+ }
+
+ @Override
public String getName() {
return "MAPREDLOCAL";
}
+
@Override
public int getType() {
- //assert false;
+ // assert false;
return StageType.MAPREDLOCAL;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Wed Nov 17 17:33:06 2010
@@ -194,12 +194,54 @@ public final class OperatorFactory {
* Returns an operator given the conf and a list of parent operators.
*/
public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
+ List<Operator<? extends Serializable>> oplist) {
+ Operator<T> ret = get((Class<T>) conf.getClass());
+ ret.setConf(conf);
+ if (oplist.size() == 0) {
+ return (ret);
+ }
+
+ // Add the new operator as child of each of the passed in operators
+ for (Operator op : oplist) {
+ List<Operator> children = op.getChildOperators();
+ if (children == null) {
+ children = new ArrayList<Operator>();
+ }
+ children.add(ret);
+ op.setChildOperators(children);
+ }
+
+ // add parents for the newly created operator
+ List<Operator<? extends Serializable>> parent = new ArrayList<Operator<? extends Serializable>>();
+ for (Operator op : oplist) {
+ parent.add(op);
+ }
+
+ ret.setParentOperators(parent);
+
+ return (ret);
+ }
+
+ /**
+ * Returns an operator given the conf and a list of parent operators.
+ */
+ public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
RowSchema rwsch, Operator... oplist) {
Operator<T> ret = getAndMakeChild(conf, oplist);
ret.setSchema(rwsch);
return (ret);
}
+ /**
+ * Returns an operator given the conf and a list of parent operators.
+ */
+ public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
+ RowSchema rwsch, List<Operator<? extends Serializable>> oplist) {
+ Operator<T> ret = getAndMakeChild(conf, oplist);
+ ret.setSchema(rwsch);
+ return (ret);
+ }
+
private OperatorFactory() {
// prevent instantiation
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java?rev=1036128&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java Wed Nov 17 17:33:06 2010
@@ -0,0 +1,20 @@
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.fs.Path;
+
+public class PathUtil {
+ public static String suffix=".hashtable";
+ public static String generatePath(String baseURI,Byte tag,String bigBucketFileName){
+ String path = new String(baseURI+Path.SEPARATOR+"-"+tag+"-"+bigBucketFileName+suffix);
+ return path;
+ }
+ public static String generateFileName(Byte tag,String bigBucketFileName){
+ String fileName = new String("-"+tag+"-"+bigBucketFileName+suffix);
+ return fileName;
+ }
+
+ public static String generateTmpURI(String baseURI,String id){
+ String tmpFileURI = new String(baseURI+Path.SEPARATOR+"HashTable-"+id);
+ return tmpFileURI;
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Wed Nov 17 17:33:06 2010
@@ -42,8 +42,7 @@ import org.apache.hadoop.util.StringUtil
* Task implementation.
**/
-public abstract class Task<T extends Serializable> implements Serializable,
- Node {
+public abstract class Task<T extends Serializable> implements Serializable, Node {
private static final long serialVersionUID = 1L;
protected transient boolean started;
@@ -59,6 +58,9 @@ public abstract class Task<T extends Ser
protected transient HashMap<String, Long> taskCounters;
protected transient DriverContext driverContext;
protected transient boolean clonedConf = false;
+ protected Task<? extends Serializable> backupTask;
+ protected List<Task<? extends Serializable>> backupChildrenTasks = new ArrayList<Task<? extends Serializable>>();
+
// Descendants tasks who subscribe feeds from this task
protected transient List<Task<? extends Serializable>> feedSubscribers;
@@ -81,8 +83,7 @@ public abstract class Task<T extends Ser
this.taskCounters = new HashMap<String, Long>();
}
- public void initialize(HiveConf conf, QueryPlan queryPlan,
- DriverContext driverContext) {
+ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
this.queryPlan = queryPlan;
isdone = false;
started = false;
@@ -103,8 +104,8 @@ public abstract class Task<T extends Ser
}
/**
- * This method is called in the Driver on every task. It updates counters and
- * calls execute(), which is overridden in each task
+ * This method is called in the Driver on every task. It updates counters and calls execute(),
+ * which is overridden in each task
*
* @return return value of execute()
*/
@@ -127,8 +128,7 @@ public abstract class Task<T extends Ser
}
/**
- * This method is overridden in each Task. TODO execute should return a
- * TaskHandle.
+ * This method is overridden in each Task. TODO execute should return a TaskHandle.
*
* @return status of executing the task
*/
@@ -160,10 +160,61 @@ public abstract class Task<T extends Ser
return parentTasks;
}
+ public Task<? extends Serializable> getBackupTask() {
+ return backupTask;
+ }
+
+
+ public void setBackupTask(Task<? extends Serializable> backupTask) {
+ this.backupTask = backupTask;
+ }
+
+ public List<Task<? extends Serializable>> getBackupChildrenTasks() {
+ return backupChildrenTasks;
+ }
+
+ public void setBackupChildrenTasks(List<Task<? extends Serializable>> backupChildrenTasks) {
+ this.backupChildrenTasks = backupChildrenTasks;
+ }
+
+ public Task<? extends Serializable> getAndInitBackupTask() {
+ if (backupTask != null) {
+ // first set back the backup task with its children task.
+ for (Task<? extends Serializable> backupChild : backupChildrenTasks) {
+ backupChild.getParentTasks().add(backupTask);
+ }
+
+ // recursively remove task from its children tasks if this task doesn't have any parent task
+ this.removeFromChildrenTasks();
+ }
+ return backupTask;
+ }
+
+ public void removeFromChildrenTasks() {
+
+ List<Task<? extends Serializable>> childrenTasks = this.getChildTasks();
+ if (childrenTasks == null) {
+ return;
+ }
+
+ for (Task<? extends Serializable> childTsk : childrenTasks) {
+ // remove this task from its children tasks
+ childTsk.getParentTasks().remove(this);
+
+ // recursively remove non-parent task from its children
+ List<Task<? extends Serializable>> siblingTasks = childTsk.getParentTasks();
+ if (siblingTasks == null || siblingTasks.size() == 0) {
+ childTsk.removeFromChildrenTasks();
+ }
+ }
+
+ return;
+ }
+
+
/**
- * The default dependent tasks are just child tasks, but different types
- * could implement their own (e.g. ConditionalTask will use the listTasks
- * as dependents).
+ * The default dependent tasks are just child tasks, but different types could implement their own
+ * (e.g. ConditionalTask will use the listTasks as dependents).
*
* @return a list of tasks that are dependent on this task.
*/
@@ -172,8 +223,8 @@ public abstract class Task<T extends Ser
}
/**
- * Add a dependent task on the current task. Return if the dependency already
- * existed or is this a new one
+ * Add a dependent task on the current task. Return if the dependency already existed or is this a
+ * new one
*
* @return true if the task got added false if it already existed
*/
@@ -204,8 +255,7 @@ public abstract class Task<T extends Ser
public void removeDependentTask(Task<? extends Serializable> dependent) {
if ((getChildTasks() != null) && (getChildTasks().contains(dependent))) {
getChildTasks().remove(dependent);
- if ((dependent.getParentTasks() != null)
- && (dependent.getParentTasks().contains(this))) {
+ if ((dependent.getParentTasks() != null) && (dependent.getParentTasks().contains(this))) {
dependent.getParentTasks().remove(this);
}
}
@@ -279,6 +329,10 @@ public abstract class Task<T extends Ser
return false;
}
+ public boolean isMapRedLocalTask() {
+ return false;
+ }
+
public boolean hasReduce() {
return false;
}
@@ -288,8 +342,7 @@ public abstract class Task<T extends Ser
}
/**
- * Should be overridden to return the type of the specific task among the
- * types in TaskType.
+ * Should be overridden to return the type of the specific task among the types in TaskType.
*
* @return TaskTypeType.* or -1 if not overridden
*/
@@ -299,21 +352,23 @@ public abstract class Task<T extends Ser
}
/**
- * If this task uses any map-reduce intermediate data (either for reading
- * or for writing), localize them (using the supplied Context). Map-Reduce
- * intermediate directories are allocated using Context.getMRTmpFileURI()
- * and can be localized using localizeMRTmpFileURI().
+ * If this task uses any map-reduce intermediate data (either for reading or for writing),
+ * localize them (using the supplied Context). Map-Reduce intermediate directories are allocated
+ * using Context.getMRTmpFileURI() and can be localized using localizeMRTmpFileURI().
*
- * This method is declared abstract to force any task code to explicitly
- * deal with this aspect of execution.
+ * This method is declared abstract to force any task code to explicitly deal with this aspect of
+ * execution.
*
- * @param ctx context object with which to localize
+ * @param ctx
+ * context object with which to localize
*/
abstract protected void localizeMRTmpFilesImpl(Context ctx);
/**
* Localize a task tree
- * @param ctx context object with which to localize
+ *
+ * @param ctx
+ * context object with which to localize
*/
public final void localizeMRTmpFiles(Context ctx) {
localizeMRTmpFilesImpl(ctx);
@@ -322,7 +377,7 @@ public abstract class Task<T extends Ser
return;
}
- for (Task<? extends Serializable> t: childTasks) {
+ for (Task<? extends Serializable> t : childTasks) {
t.localizeMRTmpFiles(ctx);
}
}
@@ -330,12 +385,13 @@ public abstract class Task<T extends Ser
/**
* Subscribe the feed of publisher. To prevent cycles, a task can only subscribe to its ancestor.
* Feed is a generic form of execution-time feedback (type, value) pair from one task to another
- * task. Examples include dynamic partitions (which are only available at execution time).
- * The MoveTask may pass the list of dynamic partitions to the StatsTask since after the
- * MoveTask the list of dynamic partitions are lost (MoveTask moves them to the table's
- * destination directory which is mixed with old partitions).
+ * task. Examples include dynamic partitions (which are only available at execution time). The
+ * MoveTask may pass the list of dynamic partitions to the StatsTask since after the MoveTask the
+ * list of dynamic partitions are lost (MoveTask moves them to the table's destination directory
+ * which is mixed with old partitions).
*
- * @param publisher this feed provider.
+ * @param publisher
+ * this feed provider.
*/
public void subscribeFeed(Task<? extends Serializable> publisher) {
if (publisher != this && publisher.ancestorOrSelf(this)) {
@@ -353,7 +409,7 @@ public abstract class Task<T extends Ser
}
List<Task<? extends Serializable>> deps = getDependentTasks();
if (deps != null) {
- for (Task<? extends Serializable> d: deps) {
+ for (Task<? extends Serializable> d : deps) {
if (d.ancestorOrSelf(desc)) {
return true;
}
@@ -373,7 +429,7 @@ public abstract class Task<T extends Ser
// push the feed to its subscribers
protected void pushFeed(FeedType feedType, Object feedValue) {
if (feedSubscribers != null) {
- for (Task<? extends Serializable> s: feedSubscribers) {
+ for (Task<? extends Serializable> s : feedSubscribers) {
s.receiveFeed(feedType, feedValue);
}
}
@@ -383,10 +439,10 @@ public abstract class Task<T extends Ser
protected void receiveFeed(FeedType feedType, Object feedValue) {
}
- protected void cloneConf () {
+ protected void cloneConf() {
if (!clonedConf) {
clonedConf = true;
conf = new HiveConf(conf);
}
}
-}
\ No newline at end of file
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Wed Nov 17 17:33:06 2010
@@ -73,7 +73,8 @@ public final class TaskFactory {
taskvec.add(new taskTuple<MapredLocalWork>(MapredLocalWork.class,
MapredLocalTask.class));
taskvec.add(new taskTuple<StatsWork>(StatsWork.class,
- StatsTask.class));
+ StatsTask.class));
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Nov 17 17:33:06 2010
@@ -49,6 +49,7 @@ import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -1591,4 +1592,9 @@ public final class Utilities {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
return sdf.format(cal.getTime());
}
+
+ public static String showTime(long time) {
+ SimpleDateFormat sdf = new SimpleDateFormat("ss");
+ return sdf.format(new Date(time));
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Wed Nov 17 17:33:06 2010
@@ -33,6 +33,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -43,20 +44,20 @@ import org.apache.hadoop.hive.ql.session
* the main memory hash table exceeds a certain threshold, new elements will go into the persistent
* hash table.
*/
+
public class HashMapWrapper<K, V> implements Serializable {
+ private static final long serialVersionUID = 1L;
protected Log LOG = LogFactory.getLog(this.getClass().getName());
// default threshold for using main memory based HashMap
+
private static final int THRESHOLD = 1000000;
private static final float LOADFACTOR = 0.75f;
+ private static final float MEMORYUSAGE = 1;
- private double threshold; // threshold to put data into persistent hash table
- // instead
+ private float maxMemoryUsage;
private HashMap<K, V> mHash; // main memory HashMap
-
-
-
protected transient LogHelper console;
private File dumpFile;
@@ -71,10 +72,9 @@ public class HashMapWrapper<K, V> implem
* @param threshold
* User specified threshold to store new values into persistent storage.
*/
- public HashMapWrapper(int threshold, float loadFactor) {
- this.threshold = 0.9;
+ public HashMapWrapper(int threshold, float loadFactor, float memoryUsage) {
+ maxMemoryUsage = memoryUsage;
mHash = new HashMap<K, V>(threshold, loadFactor);
- console = new LogHelper(LOG);
memoryMXBean = ManagementFactory.getMemoryMXBean();
maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
LOG.info("maximum memory: " + maxMemory);
@@ -83,30 +83,28 @@ public class HashMapWrapper<K, V> implem
}
public HashMapWrapper(int threshold) {
- this(THRESHOLD, 0.75f);
+ this(threshold, LOADFACTOR, MEMORYUSAGE);
}
public HashMapWrapper() {
- this(THRESHOLD, LOADFACTOR);
+ this(THRESHOLD, LOADFACTOR, MEMORYUSAGE);
}
-
public V get(K key) {
return mHash.get(key);
}
-
public boolean put(K key, V value) throws HiveException {
// isAbort();
mHash.put(key, value);
return false;
}
+
public void remove(K key) {
mHash.remove(key);
}
-
/**
* Flush the main memory hash table into the persistent cache file
*
@@ -146,7 +144,6 @@ public class HashMapWrapper<K, V> implem
* @throws HiveException
*/
public void close() throws HiveException {
- // isAbort();
mHash.clear();
}
@@ -158,36 +155,25 @@ public class HashMapWrapper<K, V> implem
return mHash.size();
}
- private boolean isAbort() {
- int size = mHash.size();
- // if(size >= 1000000 && size % 1000000 == 0 ){
+ public boolean isAbort(long numRows,LogHelper console) {
System.gc();
System.gc();
+ int size = mHash.size();
long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
double rate = (double) usedMemory / (double) maxMemory;
long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
- console.printInfo("Hashtable size:\t" + size + "\tMemory usage:\t" + usedMemory + "\t rate:\t"
- + num.format(rate));
- return true;
-
- }
-
- public Log getLOG() {
- return LOG;
+ console.printInfo(Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t"
+ + size + "\tMemory usage:\t" + usedMemory + "\trate:\t" + num.format(rate));
+ if (rate > (double) maxMemoryUsage) {
+ return true;
+ }
+ return false;
}
public void setLOG(Log log) {
LOG = log;
}
- public double getThreshold() {
- return threshold;
- }
-
- public void setThreshold(double threshold) {
- this.threshold = threshold;
- }
-
public HashMap<K, V> getMHash() {
return mHash;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java Wed Nov 17 17:33:06 2010
@@ -30,7 +30,7 @@ public interface Dispatcher {
/**
* Dispatcher function.
- *
+ *
* @param nd
* operator to process.
* @param stack
@@ -43,4 +43,5 @@ public interface Dispatcher {
*/
Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
throws SemanticException;
+
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/TaskGraphWalker.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/TaskGraphWalker.java?rev=1036128&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/TaskGraphWalker.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/TaskGraphWalker.java Wed Nov 17 17:33:06 2010
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lib;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * base class for operator graph walker this class takes list of starting ops
+ * and walks them one by one. it maintains list of walked operators
+ * (dispatchedList) and a list of operators that are discovered but not yet
+ * dispatched
+ */
+public class TaskGraphWalker implements GraphWalker {
+
+
+ public class TaskGraphWalkerContext{
+ private final HashMap<Node, Object> reMap;
+
+ public TaskGraphWalkerContext(HashMap<Node, Object> reMap){
+ this.reMap = reMap;
+ }
+ public void addToDispatchList(Node dispatchedObj){
+ if(dispatchedObj != null) {
+ retMap.put(dispatchedObj, null);
+ }
+ }
+ }
+
+ protected Stack<Node> opStack;
+ private final List<Node> toWalk = new ArrayList<Node>();
+ private final HashMap<Node, Object> retMap = new HashMap<Node, Object>();
+ private final Dispatcher dispatcher;
+ private final TaskGraphWalkerContext walkerCtx;
+
+ /**
+ * Constructor.
+ *
+ * @param disp
+ * dispatcher to call for each op encountered
+ */
+ public TaskGraphWalker(Dispatcher disp) {
+ dispatcher = disp;
+ opStack = new Stack<Node>();
+ walkerCtx = new TaskGraphWalkerContext(retMap);
+ }
+
+ /**
+ * @return the toWalk
+ */
+ public List<Node> getToWalk() {
+ return toWalk;
+ }
+
+ /**
+ * @return the doneList
+ */
+ public Set<Node> getDispatchedList() {
+ return retMap.keySet();
+ }
+
+ /**
+ * Dispatch the current operator.
+ *
+ * @param nd
+ * node being walked
+ * @param ndStack
+ * stack of nodes encountered
+ * @throws SemanticException
+ */
+ public void dispatch(Node nd, Stack<Node> ndStack,TaskGraphWalkerContext walkerCtx) throws SemanticException {
+ Object[] nodeOutputs = null;
+ if (nd.getChildren() != null) {
+ nodeOutputs = new Object[nd.getChildren().size()+1];
+ nodeOutputs[0] = walkerCtx;
+ int i = 1;
+ for (Node child : nd.getChildren()) {
+ nodeOutputs[i++] = retMap.get(child);
+ }
+ }else{
+ nodeOutputs = new Object[1];
+ nodeOutputs[0] = walkerCtx;
+ }
+
+ Object retVal = dispatcher.dispatch(nd, ndStack, nodeOutputs);
+ retMap.put(nd, retVal);
+ }
+
+ public void dispatch(Node nd, Stack<Node> ndStack) throws SemanticException {
+ Object[] nodeOutputs = null;
+ if (nd.getChildren() != null) {
+ nodeOutputs = new Object[nd.getChildren().size()];
+ int i = 1;
+ for (Node child : nd.getChildren()) {
+ nodeOutputs[i++] = retMap.get(child);
+ }
+ }
+
+ Object retVal = dispatcher.dispatch(nd, ndStack, nodeOutputs);
+ retMap.put(nd, retVal);
+ }
+
+ /**
+ * starting point for walking.
+ *
+ * @throws SemanticException
+ */
+ public void startWalking(Collection<Node> startNodes,
+ HashMap<Node, Object> nodeOutput) throws SemanticException {
+ toWalk.addAll(startNodes);
+ while (toWalk.size() > 0) {
+ Node nd = toWalk.remove(0);
+ walk(nd);
+ if (nodeOutput != null) {
+ nodeOutput.put(nd, retMap.get(nd));
+ }
+ }
+ }
+
+ /**
+ * walk the current operator and its descendants.
+ *
+ * @param nd
+ * current operator in the graph
+ * @throws SemanticException
+ */
+ public void walk(Node nd) throws SemanticException {
+ if(!(nd instanceof Task)){
+ throw new SemanticException("Task Graph Walker only walks for Task Graph");
+ }
+
+ if (getDispatchedList().contains(nd)) {
+ return;
+ }
+ if (opStack.empty() || nd != opStack.peek()) {
+ opStack.push(nd);
+ }
+
+ List<Task<? extends Serializable>> nextTaskList = null;
+ Set<Task<? extends Serializable>> nextTaskSet = new HashSet<Task<? extends Serializable>>();
+ List<Task<? extends Serializable>> taskListInConditionalTask = null;
+
+
+ if(nd instanceof ConditionalTask ){
+ //for conditional task, next task list should return the children tasks of each task, which
+ //is contained in the conditional task.
+ taskListInConditionalTask = ((ConditionalTask) nd).getListTasks();
+ for(Task<? extends Serializable> tsk: taskListInConditionalTask){
+ List<Task<? extends Serializable>> childTask = tsk.getChildTasks();
+ if(childTask != null){
+ nextTaskSet.addAll(tsk.getChildTasks());
+ }
+ }
+ //convert the set into list
+ if(nextTaskSet.size()>0){
+ nextTaskList = new ArrayList<Task<? extends Serializable>>();
+ for(Task<? extends Serializable> tsk:nextTaskSet ){
+ nextTaskList.add(tsk);
+ }
+ }
+ }else{
+ //for other tasks, just return its children tasks
+ nextTaskList = ((Task<? extends Serializable>)nd).getChildTasks();
+ }
+
+ if ((nextTaskList == null)
+ || getDispatchedList().containsAll(nextTaskList)) {
+ dispatch(nd, opStack,this.walkerCtx);
+ opStack.pop();
+ return;
+ }
+ // add children, self to the front of the queue in that order
+ getToWalk().add(0, nd);
+ getToWalk().removeAll(nextTaskList);
+ getToWalk().addAll(0, nextTaskList);
+
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Wed Nov 17 17:33:06 2010
@@ -179,7 +179,7 @@ public final class ColumnPrunerProcFacto
cppCtx.getPrunedColLists().put((Operator<? extends Serializable>) nd,
cols);
ArrayList<Integer> needed_columns = new ArrayList<Integer>();
- RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRR();
+ RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver();
TableScanDesc desc = scanOp.getConf();
List<VirtualColumn> virtualCols = desc.getVirtualCols();
List<VirtualColumn> newVirtualCols = new ArrayList<VirtualColumn>();
@@ -232,7 +232,7 @@ public final class ColumnPrunerProcFacto
ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap = cppCtx
.getOpToParseCtxMap();
- RowResolver redSinkRR = opToParseCtxMap.get(op).getRR();
+ RowResolver redSinkRR = opToParseCtxMap.get(op).getRowResolver();
ReduceSinkDesc conf = op.getConf();
List<Operator<? extends Serializable>> childOperators = op
.getChildOperators();
@@ -250,7 +250,7 @@ public final class ColumnPrunerProcFacto
assert parentOperators.size() == 1;
Operator<? extends Serializable> par = parentOperators.get(0);
JoinOperator childJoin = (JoinOperator) childOperators.get(0);
- RowResolver parRR = opToParseCtxMap.get(par).getRR();
+ RowResolver parRR = opToParseCtxMap.get(par).getRowResolver();
List<String> childJoinCols = cppCtx.getJoinPrunedColLists().get(
childJoin).get((byte) conf.getTag());
boolean[] flags = new boolean[conf.getValueCols().size()];
@@ -383,7 +383,7 @@ public final class ColumnPrunerProcFacto
ArrayList<String> newOutputColumnNames = new ArrayList<String>();
ArrayList<ColumnInfo> rs_oldsignature = op.getSchema().getSignature();
ArrayList<ColumnInfo> rs_newsignature = new ArrayList<ColumnInfo>();
- RowResolver old_rr = cppCtx.getOpToParseCtxMap().get(op).getRR();
+ RowResolver old_rr = cppCtx.getOpToParseCtxMap().get(op).getRowResolver();
RowResolver new_rr = new RowResolver();
for (String col : cols) {
int index = originalOutputColumnNames.indexOf(col);
@@ -394,7 +394,7 @@ public final class ColumnPrunerProcFacto
ColumnInfo columnInfo = old_rr.get(tabcol[0], tabcol[1]);
new_rr.put(tabcol[0], tabcol[1], columnInfo);
}
- cppCtx.getOpToParseCtxMap().get(op).setRR(new_rr);
+ cppCtx.getOpToParseCtxMap().get(op).setRowResolver(new_rr);
op.getSchema().setSignature(rs_newsignature);
conf.setColList(newColList);
conf.setOutputColumnNames(newOutputColumnNames);
@@ -465,7 +465,7 @@ public final class ColumnPrunerProcFacto
Map<String, ExprNodeDesc> oldMap = reduce.getColumnExprMap();
Map<String, ExprNodeDesc> newMap = new HashMap<String, ExprNodeDesc>();
ArrayList<ColumnInfo> sig = new ArrayList<ColumnInfo>();
- RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(reduce).getRR();
+ RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(reduce).getRowResolver();
RowResolver newRR = new RowResolver();
ArrayList<String> originalValueOutputColNames = reduceConf
.getOutputValueColumnNames();
@@ -493,7 +493,7 @@ public final class ColumnPrunerProcFacto
ArrayList<ExprNodeDesc> keyCols = reduceConf.getKeyCols();
List<String> keys = new ArrayList<String>();
RowResolver parResover = cppCtx.getOpToParseCtxMap().get(
- reduce.getParentOperators().get(0)).getRR();
+ reduce.getParentOperators().get(0)).getRowResolver();
for (int i = 0; i < keyCols.size(); i++) {
keys = Utilities.mergeUniqElems(keys, keyCols.get(i).getCols());
}
@@ -506,7 +506,7 @@ public final class ColumnPrunerProcFacto
}
}
- cppCtx.getOpToParseCtxMap().get(reduce).setRR(newRR);
+ cppCtx.getOpToParseCtxMap().get(reduce).setRowResolver(newRR);
reduce.setColumnExprMap(newMap);
reduce.getSchema().setSignature(sig);
reduceConf.setOutputValueColumnNames(newOutputColNames);
@@ -614,7 +614,7 @@ public final class ColumnPrunerProcFacto
}
}
- RowResolver joinRR = cppCtx.getOpToParseCtxMap().get(op).getRR();
+ RowResolver joinRR = cppCtx.getOpToParseCtxMap().get(op).getRowResolver();
RowResolver newJoinRR = new RowResolver();
ArrayList<String> outputCols = new ArrayList<String>();
ArrayList<ColumnInfo> rs = new ArrayList<ColumnInfo>();
@@ -699,7 +699,7 @@ public final class ColumnPrunerProcFacto
op.setColumnExprMap(newColExprMap);
conf.setOutputColumnNames(outputCols);
op.getSchema().setSignature(rs);
- cppCtx.getOpToParseCtxMap().get(op).setRR(newJoinRR);
+ cppCtx.getOpToParseCtxMap().get(op).setRowResolver(newJoinRR);
cppCtx.getJoinPrunedColLists().put(op, prunedColLists);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Wed Nov 17 17:33:06 2010
@@ -217,7 +217,7 @@ public class GenMRFileSink1 implements N
// Add the extract operator to get the value fields
RowResolver out_rwsch = new RowResolver();
- RowResolver interim_rwsch = ctx.getParseCtx().getOpParseCtx().get(fsOp).getRR();
+ RowResolver interim_rwsch = ctx.getParseCtx().getOpParseCtx().get(fsOp).getRowResolver();
Integer pos = Integer.valueOf(0);
for (ColumnInfo colInfo : interim_rwsch.getColumnInfos()) {
String[] info = interim_rwsch.reverseLookup(colInfo.getInternalName());
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Nov 17 17:33:06 2010
@@ -858,8 +858,9 @@ public final class GenMapRedUtils {
// create a dummy tableScan operator on top of op
// TableScanOperator is implicitly created here for each MapOperator
+ RowResolver rowResolver = opProcCtx.getParseCtx().getOpParseCtx().get(parent).getRowResolver();
Operator<? extends Serializable> ts_op = putOpInsertMap(OperatorFactory
- .get(TableScanDesc.class, parent.getSchema()), null, parseCtx);
+ .get(TableScanDesc.class, parent.getSchema()), rowResolver, parseCtx);
childOpList = new ArrayList<Operator<? extends Serializable>>();
childOpList.add(op);