You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2010/11/12 07:12:47 UTC
svn commit: r1034276 [2/14] - in /hive/trunk: ./
ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/hadoop/hive/ql/optimizer...
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Nov 12 06:12:44 2010
@@ -17,9 +17,8 @@
*/
package org.apache.hadoop.hive.ql.exec;
-import java.io.File;
+
import java.io.Serializable;
-import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -30,15 +29,15 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.JDBMSinkOperator.JDBMSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.ql.util.JoinUtil;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
@@ -48,28 +47,24 @@ import org.apache.hadoop.util.Reflection
/**
* Map side Join operator implementation.
*/
-public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implements
- Serializable {
+public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implements Serializable {
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(MapJoinOperator.class
- .getName());
+ private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
- protected transient Map<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>> mapJoinTables;
+ protected transient Map<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>> mapJoinTables;
private static final transient String[] FATAL_ERR_MSG = {
null, // counter value 0 means no error
"Mapside join size exceeds hive.mapjoin.maxsize. "
- + "Please increase that or remove the mapjoin hint."
- };
-
-
-
+ + "Please increase that or remove the mapjoin hint."};
+ protected transient Map<Byte, MapJoinRowContainer<ArrayList<Object>>> rowContainerMap;
transient int metadataKeyTag;
transient int[] metadataValueTag;
transient int maxMapJoinSize;
private int bigTableAlias;
+
public MapJoinOperator() {
}
@@ -82,8 +77,7 @@ public class MapJoinOperator extends Abs
super.initializeOp(hconf);
- maxMapJoinSize = HiveConf.getIntVar(hconf,
- HiveConf.ConfVars.HIVEMAXMAPJOINSIZE);
+ maxMapJoinSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAXMAPJOINSIZE);
metadataValueTag = new int[numAliases];
for (int pos = 0; pos < numAliases; pos++) {
@@ -93,24 +87,23 @@ public class MapJoinOperator extends Abs
metadataKeyTag = -1;
bigTableAlias = order[posBigTable];
- mapJoinTables = new HashMap<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>>();
-
+ mapJoinTables = new HashMap<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>>();
+ rowContainerMap = new HashMap<Byte, MapJoinRowContainer<ArrayList<Object>>>();
// initialize the hash tables for other tables
for (int pos = 0; pos < numAliases; pos++) {
if (pos == posBigTable) {
continue;
}
- int cacheSize = HiveConf.getIntVar(hconf,
- HiveConf.ConfVars.HIVEMAPJOINCACHEROWS);
- HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue> hashTable = new HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>(
- cacheSize);
+ HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
mapJoinTables.put(Byte.valueOf((byte) pos), hashTable);
+ MapJoinRowContainer<ArrayList<Object>> rowContainer = new MapJoinRowContainer<ArrayList<Object>>();
+ rowContainerMap.put(Byte.valueOf((byte) pos), rowContainer);
}
- }
+ }
@Override
protected void fatalErrorMessage(StringBuilder errMsg, long counterCode) {
@@ -118,113 +111,100 @@ public class MapJoinOperator extends Abs
+ FATAL_ERR_MSG[(int) counterCode]);
}
-
- public void generateMapMetaData() throws HiveException,SerDeException{
- //generate the meta data for key
- //index for key is -1
+ public void generateMapMetaData() throws HiveException, SerDeException {
+ // generate the meta data for key
+ // index for key is -1
TableDesc keyTableDesc = conf.getKeyTblDesc();
- SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(
- keyTableDesc.getDeserializerClass(), null);
+ SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
+ null);
keySerializer.initialize(null, keyTableDesc.getProperties());
- MapJoinMetaData.put(Integer.valueOf(metadataKeyTag),
- new JDBMSinkObjectCtx(
- ObjectInspectorUtils
- .getStandardObjectInspector(keySerializer
- .getObjectInspector(),
- ObjectInspectorCopyOption.WRITABLE), keySerializer,
- keyTableDesc, hconf));
+ MapJoinMetaData.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
+ ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
+ ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, hconf));
- //index for values is just alias
+ // index for values is just alias
for (int tag = 0; tag < order.length; tag++) {
int alias = (int) order[tag];
- if(alias == this.bigTableAlias){
+ if (alias == this.bigTableAlias) {
continue;
}
TableDesc valueTableDesc = conf.getValueTblDescs().get(tag);
- SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc
- .getDeserializerClass(), null);
+ SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(),
+ null);
valueSerDe.initialize(null, valueTableDesc.getProperties());
- MapJoinMetaData.put(Integer.valueOf(alias),
- new JDBMSinkObjectCtx(ObjectInspectorUtils
+ MapJoinMetaData.put(Integer.valueOf(alias), new HashTableSinkObjectCtx(ObjectInspectorUtils
.getStandardObjectInspector(valueSerDe.getObjectInspector(),
- ObjectInspectorCopyOption.WRITABLE), valueSerDe,
- valueTableDesc, hconf));
+ ObjectInspectorCopyOption.WRITABLE), valueSerDe, valueTableDesc, hconf));
}
}
- private void loadJDBM() throws HiveException{
+ private void loadHashTable() throws HiveException {
boolean localMode = HiveConf.getVar(hconf, HiveConf.ConfVars.HADOOPJT).equals("local");
- String tmpURI =null;
- HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue> hashtable;
+ String tmpURI = null;
+ HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashtable;
Byte pos;
- int alias;
- String currentInputFile = HiveConf.getVar(hconf,
- HiveConf.ConfVars.HADOOPMAPFILENAME);
+ String currentInputFile = HiveConf.getVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME);
+ LOG.info("******* Load from HashTable File: input : " + currentInputFile);
String currentFileName;
- if(this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
- currentFileName= this.getFileName(currentInputFile);
+ if (this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
+ currentFileName = this.getFileName(currentInputFile);
} else {
- currentFileName="-";
+ currentFileName = "-";
}
- LOG.info("******* Filename : "+ currentFileName);
- try{
- if(localMode){
- //load the jdbm file from tmp dir
- tmpURI= this.getExecContext().getLocalWork().getTmpFileURI();
- for(Map.Entry<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>> entry: mapJoinTables.entrySet()){
+
+ try {
+ if (localMode) {
+ LOG.info("******* Load from tmp file uri ***");
+ tmpURI = this.getExecContext().getLocalWork().getTmpFileURI();
+ for (Map.Entry<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>> entry : mapJoinTables
+ .entrySet()) {
pos = entry.getKey();
- hashtable=entry.getValue();
- URI uri = new URI(tmpURI+Path.SEPARATOR+"-"+pos+"-"+currentFileName+".jdbm");
- LOG.info("\tLoad back 1 JDBM file from tmp file uri:"+uri.toString());
- Path path = new Path(tmpURI+Path.SEPARATOR+"-"+pos+"-"+currentFileName+".jdbm");
- LOG.info("\tLoad back 1 JDBM file from tmp file uri:"+path.toString());
+ hashtable = entry.getValue();
+ String filePath = Utilities.generatePath(tmpURI, pos, currentFileName);
+ Path path = new Path(filePath);
+ LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path.toString());
- File jdbmFile = new File(path.toUri());
- hashtable.initilizePersistentHash(jdbmFile);
+ hashtable.initilizePersistentHash(path.toUri().getPath());
+ }
+ } else {
+
+ Path[] localFiles = DistributedCache.getLocalCacheFiles(this.hconf);
+
+ for (Map.Entry<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>> entry : mapJoinTables
+ .entrySet()) {
+ pos = entry.getKey();
+ hashtable = entry.getValue();
+ String suffix = Utilities.generateFileName(pos, currentFileName);
+ LOG.info("Looking for hashtable file with suffix: " + suffix);
+
+ boolean found = false;
+ for (int i = 0; i < localFiles.length; i++) {
+ Path path = localFiles[i];
+
+ if (path.toString().endsWith(suffix)) {
+ LOG.info("Matching suffix with cached file:" + path.toString());
+ LOG.info("\tInitializing the hashtable by cached file:" + path.toString());
+ hashtable.initilizePersistentHash(path.toString());
+ found = true;
+ LOG.info("\tLoad back 1 hashtable file from distributed cache:" + path.toString());
+ break;
+ }
+ }
+ if (!found) {
+ LOG.error("Load nothing from Distributed Cache");
+ throw new HiveException();
+ }
}
- }else{
- //load the jdbm file from distributed cache
- Path[] localFiles= DistributedCache.getLocalCacheFiles(this.hconf);
- for(int i = 0;i<localFiles.length; i++){
- Path path = localFiles[i];
- }
-
-
- for(Map.Entry<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>> entry: mapJoinTables.entrySet()){
- pos = entry.getKey();
- hashtable=entry.getValue();
- String suffix="-"+pos+"-"+currentFileName+".jdbm";
- LOG.info("Looking for jdbm file with suffix: "+suffix);
-
- boolean found=false;
- for(int i = 0;i<localFiles.length; i++){
- Path path = localFiles[i];
-
- if(path.toString().endsWith(suffix)){
- LOG.info("Matching suffix with cached file:"+path.toString());
- File jdbmFile = new File(path.toString());
- LOG.info("\tInitializing the JDBM by cached file:"+path.toString());
- hashtable.initilizePersistentHash(jdbmFile);
- found = true;
- LOG.info("\tLoad back 1 JDBM file from distributed cache:"+path.toString());
- break;
- }
- }
- if(!found){
- LOG.error("Load nothing from Distributed Cache");
- throw new HiveException();
- }
- }
}
- }catch (Exception e){
+ } catch (Exception e) {
e.printStackTrace();
LOG.error("Load Hash Table error");
@@ -238,48 +218,50 @@ public class MapJoinOperator extends Abs
public void processOp(Object row, int tag) throws HiveException {
try {
- if(firstRow){
- //generate the map metadata
+ if (firstRow) {
+ // generate the map metadata
generateMapMetaData();
firstRow = false;
}
- if(this.getExecContext().inputFileChanged()){
- loadJDBM();
+ if (this.getExecContext().inputFileChanged()) {
+ loadHashTable();
}
// get alias
alias = order[tag];
- //alias = (byte)tag;
+ // alias = (byte)tag;
if ((lastAlias == null) || (!lastAlias.equals(alias))) {
nextSz = joinEmitInterval;
}
// compute keys and values as StandardObjects
- ArrayList<Object> key = JoinUtil.computeKeys(row, joinKeys.get(alias),
+ AbstractMapJoinKey key = JoinUtil.computeMapJoinKeys(row, joinKeys.get(alias),
joinKeysObjectInspectors.get(alias));
ArrayList<Object> value = JoinUtil.computeValues(row, joinValues.get(alias),
- joinValuesObjectInspectors.get(alias), joinFilters.get(alias),
- joinFilterObjectInspectors.get(alias), noOuterJoin);
+ joinValuesObjectInspectors.get(alias), joinFilters.get(alias), joinFilterObjectInspectors
+ .get(alias), noOuterJoin);
// Add the value to the ArrayList
- storage.get((byte)tag).add(value);
+ storage.get((byte) tag).add(value);
for (Byte pos : order) {
if (pos.intValue() != tag) {
- MapJoinObjectKey keyMap = new MapJoinObjectKey(metadataKeyTag, key);
- MapJoinObjectValue o = mapJoinTables.get(pos).getMapJoinValueObject(keyMap);
+
+ MapJoinObjectValue o = mapJoinTables.get(pos).get(key);
+ MapJoinRowContainer<ArrayList<Object>> rowContainer = rowContainerMap.get(pos);
// there is no join-value or join-key has all null elements
- if (o == null || (hasAnyNulls(key))) {
+ if (o == null || key.hasAnyNulls()) {
if (noOuterJoin) {
storage.put(pos, emptyList);
} else {
storage.put(pos, dummyObjVectors[pos.intValue()]);
}
} else {
- storage.put(pos, o.getObj());
+ rowContainer.reset(o.getObj());
+ storage.put(pos, rowContainer);
}
}
}
@@ -288,7 +270,7 @@ public class MapJoinOperator extends Abs
checkAndGenObject();
// done with the row
- storage.get((byte)tag).clear();
+ storage.get((byte) tag).clear();
for (Byte pos : order) {
if (pos.intValue() != tag) {
@@ -301,20 +283,22 @@ public class MapJoinOperator extends Abs
throw new HiveException(e);
}
}
- private String getFileName(String path){
- if(path== null || path.length()==0) {
+
+ private String getFileName(String path) {
+ if (path == null || path.length() == 0) {
return null;
}
- int last_separator = path.lastIndexOf(Path.SEPARATOR)+1;
+ int last_separator = path.lastIndexOf(Path.SEPARATOR) + 1;
String fileName = path.substring(last_separator);
return fileName;
}
+
@Override
public void closeOp(boolean abort) throws HiveException {
- if(mapJoinTables != null) {
+ if (mapJoinTables != null) {
for (HashMapWrapper hashTable : mapJoinTables.values()) {
hashTable.close();
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java Fri Nov 12 06:12:44 2010
@@ -18,26 +18,39 @@
package org.apache.hadoop.hive.ql.exec;
import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -47,11 +60,14 @@ import org.apache.hadoop.util.Reflection
public class MapredLocalTask extends Task<MapredLocalWork> implements Serializable {
private Map<String, FetchOperator> fetchOperators;
- private File jdbmFile;
private JobConf job;
public static final Log l4j = LogFactory.getLog("MapredLocalTask");
- private MapOperator mo;
- // not sure we need this exec context; but all the operators in the work
+ static final String HADOOP_MEM_KEY = "HADOOP_HEAPSIZE";
+ static final String HADOOP_OPTS_KEY = "HADOOP_OPTS";
+ static final String[] HIVE_SYS_PROP = {"build.dir", "build.dir.hive"};
+ public static MemoryMXBean memoryMXBean;
+
+ // not sure we need this exec context; but all the operators in the work
// will pass this context throught
private final ExecMapperContext execContext = new ExecMapperContext();
@@ -59,6 +75,13 @@ public class MapredLocalTask extends Ta
super();
}
+ public MapredLocalTask(MapredLocalWork plan, JobConf job, boolean isSilent) throws HiveException {
+ setWork(plan);
+ this.job = job;
+ LOG = LogFactory.getLog(this.getClass().getName());
+ console = new LogHelper(LOG, isSilent);
+ }
+
@Override
public void initialize(HiveConf conf, QueryPlan queryPlan,
DriverContext driverContext) {
@@ -66,12 +89,159 @@ public class MapredLocalTask extends Ta
job = new JobConf(conf, ExecDriver.class);
}
+ public static String now(){
+ Calendar cal = Calendar.getInstance();
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss");
+ return sdf.format(cal.getTime());
+ }
+
+
+
@Override
-public int execute(DriverContext driverContext){
+ public int execute(DriverContext driverContext){
+ try{
+ //generate the cmd line to run in the child jvm
+ //String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
+ Context ctx = driverContext.getCtx();
+ String hiveJar = conf.getJar();
+
+ String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
+ String libJarsOption;
+
+ // write out the plan to a local file
+ Path planPath = new Path(ctx.getLocalTmpFileURI(), "plan.xml");
+ OutputStream out = FileSystem.getLocal(conf).create(planPath);
+ MapredLocalWork plan = getWork();
+ LOG.info("Generating plan file " + planPath.toString());
+ Utilities.serializeMapRedLocalWork(plan, out);
+
+ String isSilent = "true".equalsIgnoreCase(System
+ .getProperty("test.silent")) ? "-nolog" : "";
+
+ String jarCmd;
+
+ jarCmd = hiveJar + " " + ExecDriver.class.getName() ;
+
+ String hiveConfArgs = ExecDriver.generateCmdLine(conf);
+ String cmdLine = hadoopExec + " jar " + jarCmd + " -localtask -plan "
+ + planPath.toString() + " " + isSilent + " " + hiveConfArgs;
+
+ String workDir = (new File(".")).getCanonicalPath();
+ String files = ExecDriver.getResourceFiles(conf, SessionState.ResourceType.FILE);
+
+ if (!files.isEmpty()) {
+ cmdLine = cmdLine + " -files " + files;
+
+ workDir = (new Path(ctx.getLocalTmpFileURI())).toUri().getPath();
+
+ if (! (new File(workDir)).mkdir()) {
+ throw new IOException ("Cannot create tmp working dir: " + workDir);
+ }
+
+ for (String f: StringUtils.split(files, ',')) {
+ Path p = new Path(f);
+ String target = p.toUri().getPath();
+ String link = workDir + Path.SEPARATOR + p.getName();
+ if (FileUtil.symLink(target, link) != 0) {
+ throw new IOException ("Cannot link to added file: " + target + " from: " + link);
+ }
+ }
+ }
+
+ LOG.info("Executing: " + cmdLine);
+ Process executor = null;
+
+ // Inherit Java system variables
+ String hadoopOpts;
+ StringBuilder sb = new StringBuilder();
+ Properties p = System.getProperties();
+ for (String element : HIVE_SYS_PROP) {
+ if (p.containsKey(element)) {
+ sb.append(" -D" + element + "=" + p.getProperty(element));
+ }
+ }
+ hadoopOpts = sb.toString();
+ // Inherit the environment variables
+ String[] env;
+ Map<String, String> variables = new HashMap(System.getenv());
+ // The user can specify the hadoop memory
+
+ //if ("local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT))) {
+ // if we are running in local mode - then the amount of memory used
+ // by the child jvm can no longer default to the memory used by the
+ // parent jvm
+ //int hadoopMem = conf.getIntVar(HiveConf.ConfVars.HIVEHADOOPMAXMEM);
+ int hadoopMem= conf.getIntVar(HiveConf.ConfVars.HIVEHADOOPMAXMEM);;
+ if (hadoopMem == 0) {
+ // remove env var that would default child jvm to use parent's memory
+ // as default. child jvm would use default memory for a hadoop client
+ variables.remove(HADOOP_MEM_KEY);
+ } else {
+ // user specified the memory for local mode hadoop run
+ console.printInfo(" set heap size\t"+hadoopMem+"MB");
+ variables.put(HADOOP_MEM_KEY, String.valueOf(hadoopMem));
+ }
+ //} else {
+ // nothing to do - we are not running in local mode - only submitting
+ // the job via a child process. in this case it's appropriate that the
+ // child jvm use the same memory as the parent jvm
+
+ //}
+
+ if (variables.containsKey(HADOOP_OPTS_KEY)) {
+ variables.put(HADOOP_OPTS_KEY, variables.get(HADOOP_OPTS_KEY)
+ + hadoopOpts);
+ } else {
+ variables.put(HADOOP_OPTS_KEY, hadoopOpts);
+ }
+ env = new String[variables.size()];
+ int pos = 0;
+ for (Map.Entry<String, String> entry : variables.entrySet()) {
+ String name = entry.getKey();
+ String value = entry.getValue();
+ env[pos++] = name + "=" + value;
+ }
+
+ // Run ExecDriver in another JVM
+ executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir));
+
+ StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(),
+ null, System.out);
+ StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(),
+ null, System.err);
+
+ outPrinter.start();
+ errPrinter.start();
+
+ int exitVal = executor.waitFor();
+
+ if (exitVal != 0) {
+ LOG.error("Execution failed with exit status: " + exitVal);
+ console.printError("Mapred Local Task Failed. Give up the map join stragery");
+ } else {
+ LOG.info("Execution completed successfully");
+ console.printInfo("Mapred Local Task Running Successfully . Keep using map join stragery");
+ }
+
+ return exitVal;
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.error("Exception: " + e.getMessage());
+ return (1);
+ }
+ }
+
+
+
+ public int executeFromChildJVM(DriverContext driverContext){
+
// check the local work
if(work == null){
return -1;
}
+ memoryMXBean = ManagementFactory.getMemoryMXBean();
+ console.printInfo(Utilities.now()+"\tStarting to luaunch local task to process map join ");
+ console.printInfo("\tmaximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
fetchOperators = new HashMap<String, FetchOperator>();
Map<FetchOperator, JobConf> fetchOpJobConfMap = new HashMap<FetchOperator, JobConf>();
execContext.setJc(job);
@@ -92,14 +262,19 @@ public int execute(DriverContext driverC
}else{
startForward(inputFileChangeSenstive,null);
}
+ console.printInfo(now()+"\tEnd of local task ");
} catch (Throwable e) {
if (e instanceof OutOfMemoryError) {
// Don't create a new object if we are already out of memory
- l4j.error("Out of Merror Error");
+ l4j.error("Out of Memory Error");
+ console.printError("[Warning] Small table is too large to put into memory");
+ return 2;
} else {
l4j.error("Hive Runtime Error: Map local work failed");
e.printStackTrace();
}
+ }finally{
+ console.printInfo(Utilities.now()+"\tFinish running local task");
}
return 0;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Fri Nov 12 06:12:44 2010
@@ -28,8 +28,8 @@ import org.apache.hadoop.hive.ql.plan.Fi
import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.ForwardDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.JDBMDummyDesc;
-import org.apache.hadoop.hive.ql.plan.JDBMSinkDesc;
+import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
+import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.LateralViewForwardDesc;
import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
@@ -87,10 +87,10 @@ public final class OperatorFactory {
LateralViewJoinOperator.class));
opvec.add(new OpTuple<LateralViewForwardDesc>(LateralViewForwardDesc.class,
LateralViewForwardOperator.class));
- opvec.add(new OpTuple<JDBMDummyDesc>(JDBMDummyDesc.class,
- JDBMDummyOperator.class));
- opvec.add(new OpTuple<JDBMSinkDesc>(JDBMSinkDesc.class,
- JDBMSinkOperator.class));
+ opvec.add(new OpTuple<HashTableDummyDesc>(HashTableDummyDesc.class,
+ HashTableDummyOperator.class));
+ opvec.add(new OpTuple<HashTableSinkDesc>(HashTableSinkDesc.class,
+ HashTableSinkOperator.class));
}
public static <T extends Serializable> Operator<T> get(Class<T> opClass) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Fri Nov 12 06:12:44 2010
@@ -37,7 +37,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.ql.util.JoinUtil;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java Fri Nov 12 06:12:44 2010
@@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.exec.pe
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.ql.util.JoinUtil;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -164,7 +163,7 @@ public class SkewJoinHandler {
// reset rowcontainer's serde, objectinspector, and tableDesc.
for (int i = 0; i < numAliases; i++) {
Byte alias = conf.getTagOrder()[i];
- RowContainer<ArrayList<Object>> rc = joinOp.storage.get(Byte
+ RowContainer<ArrayList<Object>> rc = (RowContainer)joinOp.storage.get(Byte
.valueOf((byte) i));
if (rc != null) {
rc.setSerDe(tblSerializers.get((byte) i), skewKeysTableObjectInspector
@@ -178,7 +177,7 @@ public class SkewJoinHandler {
if (skewKeyInCurrentGroup) {
String specPath = conf.getBigKeysDirMap().get((byte) currBigKeyTag);
- RowContainer<ArrayList<Object>> bigKey = joinOp.storage.get(Byte
+ RowContainer<ArrayList<Object>> bigKey = (RowContainer)joinOp.storage.get(Byte
.valueOf((byte) currBigKeyTag));
Path outputPath = getOperatorOutputPath(specPath);
FileSystem destFs = outputPath.getFileSystem(hconf);
@@ -188,7 +187,7 @@ public class SkewJoinHandler {
if (((byte) i) == currBigKeyTag) {
continue;
}
- RowContainer<ArrayList<Object>> values = joinOp.storage.get(Byte
+ RowContainer<ArrayList<Object>> values = (RowContainer)joinOp.storage.get(Byte
.valueOf((byte) i));
if (values != null) {
specPath = conf.getSmallKeysDirMap().get((byte) currBigKeyTag).get(
@@ -216,7 +215,7 @@ public class SkewJoinHandler {
skewKeyInCurrentGroup = false;
for (int i = 0; i < numAliases; i++) {
- RowContainer<ArrayList<Object>> rc = joinOp.storage.get(Byte
+ RowContainer<ArrayList<Object>> rc = (RowContainer)joinOp.storage.get(Byte
.valueOf((byte) i));
if (rc != null) {
rc.setKeyObject(dummyKey);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Nov 12 06:12:44 2010
@@ -43,8 +43,10 @@ import java.io.UnsupportedEncodingExcept
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -88,11 +90,12 @@ import org.apache.hadoop.hive.ql.parse.S
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.serde.Constants;
@@ -193,14 +196,13 @@ public final class Utilities {
}
/**
- * Java 1.5 workaround. From
- * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5015403
+ * Java 1.5 workaround. From http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5015403
*/
public static class EnumDelegate extends DefaultPersistenceDelegate {
@Override
protected Expression instantiate(Object oldInstance, Encoder out) {
- return new Expression(Enum.class, "valueOf", new Object[] {
- oldInstance.getClass(), ((Enum<?>) oldInstance).name()});
+ return new Expression(Enum.class, "valueOf", new Object[] {oldInstance.getClass(),
+ ((Enum<?>) oldInstance).name()});
}
@Override
@@ -212,24 +214,26 @@ public final class Utilities {
public static class MapDelegate extends DefaultPersistenceDelegate {
@Override
protected Expression instantiate(Object oldInstance, Encoder out) {
- Map oldMap = (Map)oldInstance;
+ Map oldMap = (Map) oldInstance;
HashMap newMap = new HashMap(oldMap);
return new Expression(newMap, HashMap.class, "new", new Object[] {});
}
+
@Override
protected boolean mutatesTo(Object oldInstance, Object newInstance) {
return false;
}
+
@Override
protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
- java.util.Collection oldO = (java.util.Collection)oldInstance;
- java.util.Collection newO = (java.util.Collection)newInstance;
+ java.util.Collection oldO = (java.util.Collection) oldInstance;
+ java.util.Collection newO = (java.util.Collection) newInstance;
if (newO.size() != 0) {
- out.writeStatement(new Statement(oldInstance, "clear", new Object[]{}));
+ out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
}
for (Iterator i = oldO.iterator(); i.hasNext();) {
- out.writeStatement(new Statement(oldInstance, "add", new Object[]{i.next()}));
+ out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
}
}
}
@@ -237,24 +241,26 @@ public final class Utilities {
public static class SetDelegate extends DefaultPersistenceDelegate {
@Override
protected Expression instantiate(Object oldInstance, Encoder out) {
- Set oldSet = (Set)oldInstance;
+ Set oldSet = (Set) oldInstance;
HashSet newSet = new HashSet(oldSet);
return new Expression(newSet, HashSet.class, "new", new Object[] {});
}
+
@Override
protected boolean mutatesTo(Object oldInstance, Object newInstance) {
return false;
}
+
@Override
protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
- java.util.Collection oldO = (java.util.Collection)oldInstance;
- java.util.Collection newO = (java.util.Collection)newInstance;
+ java.util.Collection oldO = (java.util.Collection) oldInstance;
+ java.util.Collection newO = (java.util.Collection) newInstance;
if (newO.size() != 0) {
- out.writeStatement(new Statement(oldInstance, "clear", new Object[]{}));
+ out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
}
for (Iterator i = oldO.iterator(); i.hasNext();) {
- out.writeStatement(new Statement(oldInstance, "add", new Object[]{i.next()}));
+ out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
}
}
@@ -263,24 +269,26 @@ public final class Utilities {
public static class ListDelegate extends DefaultPersistenceDelegate {
@Override
protected Expression instantiate(Object oldInstance, Encoder out) {
- List oldList = (List)oldInstance;
+ List oldList = (List) oldInstance;
ArrayList newList = new ArrayList(oldList);
return new Expression(newList, ArrayList.class, "new", new Object[] {});
}
+
@Override
protected boolean mutatesTo(Object oldInstance, Object newInstance) {
return false;
}
+
@Override
protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
- java.util.Collection oldO = (java.util.Collection)oldInstance;
- java.util.Collection newO = (java.util.Collection)newInstance;
+ java.util.Collection oldO = (java.util.Collection) oldInstance;
+ java.util.Collection newO = (java.util.Collection) newInstance;
if (newO.size() != 0) {
- out.writeStatement(new Statement(oldInstance, "clear", new Object[]{}));
+ out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
}
for (Iterator i = oldO.iterator(); i.hasNext();) {
- out.writeStatement(new Statement(oldInstance, "add", new Object[]{i.next()}));
+ out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
}
}
@@ -297,12 +305,12 @@ public final class Utilities {
// Serialize the plan to the default hdfs instance
// Except for hadoop local mode execution where we should be
// able to get the plan directly from the cache
- if(!HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local")) {
+ if (!HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local")) {
// use the default file system of the job
FileSystem fs = planPath.getFileSystem(job);
FSDataOutputStream out = fs.create(planPath);
serializeMapRedWork(w, out);
-
+
// Set up distributed cache
DistributedCache.createSymlink(job);
String uriWithLink = planPath.toUri().toString() + "#HIVE_PLAN" + jobID;
@@ -310,7 +318,7 @@ public final class Utilities {
// set replication of the plan file to a high number. we use the same
// replication factor as used by the hadoop jobclient for job.xml etc.
- short replication = (short)job.getInt("mapred.submit.replication", 10);
+ short replication = (short) job.getInt("mapred.submit.replication", 10);
fs.setReplication(planPath, replication);
}
@@ -324,7 +332,7 @@ public final class Utilities {
}
public static String getHiveJobID(Configuration job) {
- String planPath= HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
+ String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
if (planPath != null) {
return (new Path(planPath)).getName();
}
@@ -346,17 +354,15 @@ public final class Utilities {
}
}
- public static ExprNodeDesc deserializeExpression(
- String s, Configuration conf) {
- byte [] bytes;
+ public static ExprNodeDesc deserializeExpression(String s, Configuration conf) {
+ byte[] bytes;
try {
bytes = s.getBytes("UTF-8");
} catch (UnsupportedEncodingException ex) {
throw new RuntimeException("UTF-8 support required", ex);
}
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- XMLDecoder decoder = new XMLDecoder(
- bais, null, null, conf.getClassLoader());
+ XMLDecoder decoder = new XMLDecoder(bais, null, null, conf.getClassLoader());
try {
ExprNodeDesc expr = (ExprNodeDesc) decoder.readObject();
return expr;
@@ -368,36 +374,29 @@ public final class Utilities {
/**
* Serialize a single Task.
*/
- public static void serializeTasks(Task<? extends Serializable> t,
- OutputStream out) {
+ public static void serializeTasks(Task<? extends Serializable> t, OutputStream out) {
XMLEncoder e = new XMLEncoder(out);
// workaround for java 1.5
e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
- e.setPersistenceDelegate(Operator.ProgressCounter.class,
- new EnumDelegate());
+ e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate());
e.writeObject(t);
e.close();
}
- public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate {
+ public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate {
@Override
protected Expression instantiate(Object oldInstance, Encoder out) {
- return new Expression(oldInstance,
- oldInstance.getClass(),
- "new",
- null);
+ return new Expression(oldInstance, oldInstance.getClass(), "new", null);
}
@Override
- protected void initialize(Class type, Object oldInstance, Object newInstance,
- Encoder out) {
+ protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) {
Iterator ite = ((Collection) oldInstance).iterator();
while (ite.hasNext()) {
- out.writeStatement(new Statement(oldInstance, "add",
- new Object[] { ite.next() }));
- }
+ out.writeStatement(new Statement(oldInstance, "add", new Object[] {ite.next()}));
+ }
}
}
@@ -415,8 +414,7 @@ public final class Utilities {
// workaround for java 1.5
e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
- e.setPersistenceDelegate(Operator.ProgressCounter.class,
- new EnumDelegate());
+ e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate());
e.setPersistenceDelegate(org.datanucleus.store.types.sco.backed.Map.class, new MapDelegate());
e.setPersistenceDelegate(org.datanucleus.store.types.sco.backed.List.class, new ListDelegate());
@@ -428,8 +426,7 @@ public final class Utilities {
/**
* Deserialize the whole query plan.
*/
- public static QueryPlan deserializeQueryPlan(InputStream in,
- Configuration conf) {
+ public static QueryPlan deserializeQueryPlan(InputStream in, Configuration conf) {
XMLDecoder d = new XMLDecoder(in, null, null, conf.getClassLoader());
QueryPlan ret = (QueryPlan) d.readObject();
d.close();
@@ -437,9 +434,8 @@ public final class Utilities {
}
/**
- * Serialize the mapredWork object to an output stream. DO NOT use this to
- * write to standard output since it closes the output stream.
- * DO USE mapredWork.toXML() instead.
+ * Serialize the mapredWork object to an output stream. DO NOT use this to write to standard
+ * output since it closes the output stream. DO USE mapredWork.toXML() instead.
*/
public static void serializeMapRedWork(MapredWork w, OutputStream out) {
XMLEncoder e = new XMLEncoder(out);
@@ -450,8 +446,7 @@ public final class Utilities {
e.close();
}
- public static MapredWork deserializeMapRedWork(InputStream in,
- Configuration conf) {
+ public static MapredWork deserializeMapRedWork(InputStream in, Configuration conf) {
XMLDecoder d = new XMLDecoder(in, null, null, conf.getClassLoader());
MapredWork ret = (MapredWork) d.readObject();
d.close();
@@ -459,6 +454,26 @@ public final class Utilities {
}
/**
+ * Serialize the mapredLocalWork object to an output stream. DO NOT use this to write to standard
+ * output since it closes the output stream. DO USE mapredWork.toXML() instead.
+ */
+ public static void serializeMapRedLocalWork(MapredLocalWork w, OutputStream out) {
+ XMLEncoder e = new XMLEncoder(out);
+ // workaround for java 1.5
+ e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
+ e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
+ e.writeObject(w);
+ e.close();
+ }
+
+ public static MapredLocalWork deserializeMapRedLocalWork(InputStream in, Configuration conf) {
+ XMLDecoder d = new XMLDecoder(in, null, null, conf.getClassLoader());
+ MapredLocalWork ret = (MapredLocalWork) d.readObject();
+ d.close();
+ return (ret);
+ }
+
+ /**
* Tuple.
*
* @param <T>
@@ -507,19 +522,17 @@ public final class Utilities {
public static Random randGen = new Random();
/**
- * Gets the task id if we are running as a Hadoop job. Gets a random number
- * otherwise.
+ * Gets the task id if we are running as a Hadoop job. Gets a random number otherwise.
*/
public static String getTaskId(Configuration hconf) {
String taskid = (hconf == null) ? null : hconf.get("mapred.task.id");
if ((taskid == null) || taskid.equals("")) {
return ("" + Math.abs(randGen.nextInt()));
} else {
- /* extract the task and attempt id from the hadoop taskid.
- in version 17 the leading component was 'task_'. thereafter
- the leading component is 'attempt_'. in 17 - hadoop also
- seems to have used _map_ and _reduce_ to denote map/reduce
- task types
+ /*
+ * extract the task and attempt id from the hadoop taskid. in version 17 the leading component
+ * was 'task_'. thereafter the leading component is 'attempt_'. in 17 - hadoop also seems to
+ * have used _map_ and _reduce_ to denote map/reduce task types
*/
String ret = taskid.replaceAll(".*_[mr]_", "").replaceAll(".*_(map|reduce)_", "");
return (ret);
@@ -587,29 +600,26 @@ public final class Utilities {
}
public static TableDesc getTableDesc(Table tbl) {
- return (new TableDesc(tbl.getDeserializer().getClass(), tbl
- .getInputFormatClass(), tbl.getOutputFormatClass(), tbl.getSchema()));
+ return (new TableDesc(tbl.getDeserializer().getClass(), tbl.getInputFormatClass(), tbl
+ .getOutputFormatClass(), tbl.getSchema()));
}
// column names and column types are all delimited by comma
public static TableDesc getTableDesc(String cols, String colTypes) {
return (new TableDesc(LazySimpleSerDe.class, SequenceFileInputFormat.class,
HiveSequenceFileOutputFormat.class, Utilities.makeProperties(
- org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, ""
- + Utilities.ctrlaCode,
- org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols,
- org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes)));
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode,
+ org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols,
+ org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes)));
}
- public static PartitionDesc getPartitionDesc(Partition part)
- throws HiveException {
+ public static PartitionDesc getPartitionDesc(Partition part) throws HiveException {
return (new PartitionDesc(part));
}
- public static void addMapWork(MapredWork mr, Table tbl, String alias,
- Operator<?> work) {
- mr.addMapWork(tbl.getDataLocation().getPath(), alias, work,
- new PartitionDesc(getTableDesc(tbl), null));
+ public static void addMapWork(MapredWork mr, Table tbl, String alias, Operator<?> work) {
+ mr.addMapWork(tbl.getDataLocation().getPath(), alias, work, new PartitionDesc(
+ getTableDesc(tbl), null));
}
private static String getOpTreeSkel_helper(Operator<?> op, String indent) {
@@ -641,8 +651,8 @@ public final class Utilities {
return Character.isWhitespace((char) c);
}
- public static boolean contentsEqual(InputStream is1, InputStream is2,
- boolean ignoreWhitespace) throws IOException {
+ public static boolean contentsEqual(InputStream is1, InputStream is2, boolean ignoreWhitespace)
+ throws IOException {
try {
if ((is1 == is2) || (is1 == null && is2 == null)) {
return true;
@@ -710,8 +720,7 @@ public final class Utilities {
EOF, TERMINATED
}
- public static StreamStatus readColumn(DataInput in, OutputStream out)
- throws IOException {
+ public static StreamStatus readColumn(DataInput in, OutputStream out) throws IOException {
while (true) {
int b;
@@ -731,8 +740,8 @@ public final class Utilities {
}
/**
- * Convert an output stream to a compressed output stream based on codecs and
- * compression options specified in the Job Configuration.
+ * Convert an output stream to a compressed output stream based on codecs and compression options
+ * specified in the Job Configuration.
*
* @param jc
* Job Configuration
@@ -747,9 +756,8 @@ public final class Utilities {
}
/**
- * Convert an output stream to a compressed output stream based on codecs
- * codecs in the Job Configuration. Caller specifies directly whether file is
- * compressed or not
+ * Convert an output stream to a compressed output stream based on codecs codecs in the Job
+ * Configuration. Caller specifies directly whether file is compressed or not
*
* @param jc
* Job Configuration
@@ -759,11 +767,11 @@ public final class Utilities {
* whether the output stream needs to be compressed or not
* @return compressed output stream
*/
- public static OutputStream createCompressedStream(JobConf jc,
- OutputStream out, boolean isCompressed) throws IOException {
+ public static OutputStream createCompressedStream(JobConf jc, OutputStream out,
+ boolean isCompressed) throws IOException {
if (isCompressed) {
- Class<? extends CompressionCodec> codecClass = FileOutputFormat
- .getOutputCompressorClass(jc, DefaultCodec.class);
+ Class<? extends CompressionCodec> codecClass = FileOutputFormat.getOutputCompressorClass(jc,
+ DefaultCodec.class);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
return codec.createOutputStream(out);
} else {
@@ -772,8 +780,8 @@ public final class Utilities {
}
/**
- * Based on compression option and configured output codec - get extension for
- * output file. This is only required for text files - not sequencefiles
+ * Based on compression option and configured output codec - get extension for output file. This
+ * is only required for text files - not sequencefiles
*
* @param jc
* Job Configuration
@@ -785,8 +793,8 @@ public final class Utilities {
if (!isCompressed) {
return "";
} else {
- Class<? extends CompressionCodec> codecClass = FileOutputFormat
- .getOutputCompressorClass(jc, DefaultCodec.class);
+ Class<? extends CompressionCodec> codecClass = FileOutputFormat.getOutputCompressorClass(jc,
+ DefaultCodec.class);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
return codec.getDefaultExtension();
}
@@ -807,17 +815,15 @@ public final class Utilities {
* Java Class for value
* @return output stream over the created sequencefile
*/
- public static SequenceFile.Writer createSequenceWriter(JobConf jc,
- FileSystem fs, Path file, Class<?> keyClass, Class<?> valClass)
- throws IOException {
+ public static SequenceFile.Writer createSequenceWriter(JobConf jc, FileSystem fs, Path file,
+ Class<?> keyClass, Class<?> valClass) throws IOException {
boolean isCompressed = FileOutputFormat.getCompressOutput(jc);
return createSequenceWriter(jc, fs, file, keyClass, valClass, isCompressed);
}
/**
- * Create a sequencefile output stream based on job configuration Uses user
- * supplied compression flag (rather than obtaining it from the Job
- * Configuration).
+ * Create a sequencefile output stream based on job configuration Uses user supplied compression
+ * flag (rather than obtaining it from the Job Configuration).
*
* @param jc
* Job configuration
@@ -831,26 +837,23 @@ public final class Utilities {
* Java Class for value
* @return output stream over the created sequencefile
*/
- public static SequenceFile.Writer createSequenceWriter(JobConf jc,
- FileSystem fs, Path file, Class<?> keyClass, Class<?> valClass,
- boolean isCompressed) throws IOException {
+ public static SequenceFile.Writer createSequenceWriter(JobConf jc, FileSystem fs, Path file,
+ Class<?> keyClass, Class<?> valClass, boolean isCompressed) throws IOException {
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
Class codecClass = null;
if (isCompressed) {
compressionType = SequenceFileOutputFormat.getOutputCompressionType(jc);
- codecClass = FileOutputFormat.getOutputCompressorClass(jc,
- DefaultCodec.class);
+ codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
}
- return (SequenceFile.createWriter(fs, jc, file, keyClass, valClass,
- compressionType, codec));
+ return (SequenceFile.createWriter(fs, jc, file, keyClass, valClass, compressionType, codec));
}
/**
- * Create a RCFile output stream based on job configuration Uses user supplied
- * compression flag (rather than obtaining it from the Job Configuration).
+ * Create a RCFile output stream based on job configuration Uses user supplied compression flag
+ * (rather than obtaining it from the Job Configuration).
*
* @param jc
* Job configuration
@@ -860,13 +863,12 @@ public final class Utilities {
* Path to be created
* @return output stream over the created rcfile
*/
- public static RCFile.Writer createRCFileWriter(JobConf jc, FileSystem fs,
- Path file, boolean isCompressed) throws IOException {
+ public static RCFile.Writer createRCFileWriter(JobConf jc, FileSystem fs, Path file,
+ boolean isCompressed) throws IOException {
CompressionCodec codec = null;
Class<?> codecClass = null;
if (isCompressed) {
- codecClass = FileOutputFormat.getOutputCompressorClass(jc,
- DefaultCodec.class);
+ codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
}
return new RCFile.Writer(fs, jc, file, null, codec);
@@ -875,8 +877,7 @@ public final class Utilities {
/**
* Shamelessly cloned from GenericOptionsParser.
*/
- public static String realFile(String newFile, Configuration conf)
- throws IOException {
+ public static String realFile(String newFile, Configuration conf) throws IOException {
Path path = new Path(newFile);
URI pathURI = path.toUri();
FileSystem fs;
@@ -897,8 +898,7 @@ public final class Utilities {
}
String file = path.makeQualified(fs).toString();
// For compatibility with hadoop 0.17, change file:/a/b/c to file:///a/b/c
- if (StringUtils.startsWith(file, "file:/")
- && !StringUtils.startsWith(file, "file:///")) {
+ if (StringUtils.startsWith(file, "file:/") && !StringUtils.startsWith(file, "file:///")) {
file = "file:///" + file.substring("file:/".length());
}
return file;
@@ -950,9 +950,8 @@ public final class Utilities {
}
/**
- * Rename src to dst, or in the case dst already exists, move files in src to
- * dst. If there is an existing file with the same name, the new file's name
- * will be appended with "_1", "_2", etc.
+ * Rename src to dst, or in the case dst already exists, move files in src to dst. If there is an
+ * existing file with the same name, the new file's name will be appended with "_1", "_2", etc.
*
* @param fs
* the FileSystem where src and dst are on.
@@ -962,17 +961,15 @@ public final class Utilities {
* the target directory
* @throws IOException
*/
- public static void rename(FileSystem fs, Path src, Path dst)
- throws IOException, HiveException {
+ public static void rename(FileSystem fs, Path src, Path dst) throws IOException, HiveException {
if (!fs.rename(src, dst)) {
throw new HiveException("Unable to move: " + src + " to: " + dst);
}
}
/**
- * Rename src to dst, or in the case dst already exists, move files in src to
- * dst. If there is an existing file with the same name, the new file's name
- * will be appended with "_1", "_2", etc.
+ * Rename src to dst, or in the case dst already exists, move files in src to dst. If there is an
+ * existing file with the same name, the new file's name will be appended with "_1", "_2", etc.
*
* @param fs
* the FileSystem where src and dst are on.
@@ -982,8 +979,8 @@ public final class Utilities {
* the target directory
* @throws IOException
*/
- public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst)
- throws IOException, HiveException {
+ public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws IOException,
+ HiveException {
if (!fs.exists(dst)) {
if (!fs.rename(src, dst)) {
throw new HiveException("Unable to move: " + src + " to: " + dst);
@@ -1010,18 +1007,18 @@ public final class Utilities {
}
/**
- * The first group will contain the task id. The second group is the optional
- * extension. The file name looks like: "0_0" or "0_0.gz". There may be a leading
- * prefix (tmp_). Since getTaskId() can return an integer only - this should match
- * a pure integer as well
+ * The first group will contain the task id. The second group is the optional extension. The file
+ * name looks like: "0_0" or "0_0.gz". There may be a leading prefix (tmp_). Since getTaskId() can
+ * return an integer only - this should match a pure integer as well
*/
private static Pattern fileNameTaskIdRegex = Pattern.compile("^.*?([0-9]+)(_[0-9])?(\\..*)?$");
/**
- * Get the task id from the filename.
- * It is assumed that the filename is derived from the output of getTaskId
+ * Get the task id from the filename. It is assumed that the filename is derived from the output
+ * of getTaskId
*
- * @param filename filename to extract taskid from
+ * @param filename
+ * filename to extract taskid from
*/
public static String getTaskIdFromFilename(String filename) {
String taskId = filename;
@@ -1032,8 +1029,8 @@ public final class Utilities {
Matcher m = fileNameTaskIdRegex.matcher(taskId);
if (!m.matches()) {
- LOG.warn("Unable to get task id from file name: " + filename
- + ". Using last component" + taskId + " as task id.");
+ LOG.warn("Unable to get task id from file name: " + filename + ". Using last component"
+ + taskId + " as task id.");
} else {
taskId = m.group(1);
}
@@ -1042,17 +1039,16 @@ public final class Utilities {
}
/**
- * Replace the task id from the filename.
- * It is assumed that the filename is derived from the output of getTaskId
+ * Replace the task id from the filename. It is assumed that the filename is derived from the
+ * output of getTaskId
*
- * @param filename filename to replace taskid
- * "0_0" or "0_0.gz" by 33 to
- * "33_0" or "33_0.gz"
+ * @param filename
+ * filename to replace taskid "0_0" or "0_0.gz" by 33 to "33_0" or "33_0.gz"
*/
public static String replaceTaskIdFromFilename(String filename, int bucketNum) {
String taskId = getTaskIdFromFilename(filename);
String newTaskId = replaceTaskId(taskId, bucketNum);
- String ret = replaceTaskIdFromFilename(filename, taskId, newTaskId);
+ String ret = replaceTaskIdFromFilename(filename, taskId, newTaskId);
return (ret);
}
@@ -1062,21 +1058,22 @@ public final class Utilities {
int taskIdLen = taskId.length();
StringBuffer s = new StringBuffer();
for (int i = 0; i < taskIdLen - bucketNumLen; i++) {
- s.append("0");
+ s.append("0");
}
return s.toString() + strBucketNum;
}
/**
- * Replace the oldTaskId appearing in the filename by the newTaskId.
- * The string oldTaskId could appear multiple times, we should only replace the last one.
+ * Replace the oldTaskId appearing in the filename by the newTaskId. The string oldTaskId could
+ * appear multiple times, we should only replace the last one.
+ *
* @param filename
* @param oldTaskId
* @param newTaskId
* @return
*/
- private static String replaceTaskIdFromFilename(String filename,
- String oldTaskId, String newTaskId) {
+ private static String replaceTaskIdFromFilename(String filename, String oldTaskId,
+ String newTaskId) {
String[] spl = filename.split(oldTaskId);
@@ -1085,27 +1082,31 @@ public final class Utilities {
}
StringBuffer snew = new StringBuffer();
- for (int idx = 0; idx < spl.length-1; idx++) {
+ for (int idx = 0; idx < spl.length - 1; idx++) {
if (idx > 0) {
snew.append(oldTaskId);
}
snew.append(spl[idx]);
}
snew.append(newTaskId);
- snew.append(spl[spl.length-1]);
+ snew.append(spl[spl.length - 1]);
return snew.toString();
}
/**
* Get all file status from a root path and recursively go deep into certain levels.
- * @param path the root path
- * @param level the depth of directory should explore
- * @param fs the file system
+ *
+ * @param path
+ * the root path
+ * @param level
+ * the depth of directory should explore
+ * @param fs
+ * the file system
* @return array of FileStatus
* @throws IOException
*/
- public static FileStatus[] getFileStatusRecurse(Path path, int level,
- FileSystem fs) throws IOException {
+ public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs)
+ throws IOException {
// construct a path pattern (e.g., /*/*) to find all dynamically generated paths
StringBuilder sb = new StringBuilder(path.toUri().getPath());
@@ -1117,8 +1118,8 @@ public final class Utilities {
}
/**
- * Remove all temporary files and duplicate (double-committed) files from a
- * given directory.
+ * Remove all temporary files and duplicate (double-committed) files from a given directory.
+ *
* @return a list of path names corresponding to should-be-created empty buckets.
*/
public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws IOException {
@@ -1126,12 +1127,12 @@ public final class Utilities {
}
/**
- * Remove all temporary files and duplicate (double-committed) files from a
- * given directory.
+ * Remove all temporary files and duplicate (double-committed) files from a given directory.
+ *
* @return a list of path names corresponding to should-be-created empty buckets.
*/
- public static ArrayList<String> removeTempOrDuplicateFiles(FileSystem fs, Path path, DynamicPartitionCtx dpCtx)
- throws IOException {
+ public static ArrayList<String> removeTempOrDuplicateFiles(FileSystem fs, Path path,
+ DynamicPartitionCtx dpCtx) throws IOException {
if (path == null) {
return null;
}
@@ -1142,7 +1143,8 @@ public final class Utilities {
HashMap<String, FileStatus> taskIDToFile = null;
for (int i = 0; i < parts.length; ++i) {
- assert parts[i].isDir(): "dynamic partition " + parts[i].getPath() + " is not a direcgtory";
+ assert parts[i].isDir() : "dynamic partition " + parts[i].getPath()
+ + " is not a direcgtory";
FileStatus[] items = fs.listStatus(parts[i].getPath());
// remove empty directory since DP insert should not generate empty partitions.
@@ -1162,7 +1164,7 @@ public final class Utilities {
// get the missing buckets and generate empty buckets
String taskID1 = taskIDToFile.keySet().iterator().next();
Path bucketPath = taskIDToFile.values().iterator().next().getPath();
- for (int j = 0; j < dpCtx.getNumBuckets(); ++j ) {
+ for (int j = 0; j < dpCtx.getNumBuckets(); ++j) {
String taskID2 = replaceTaskId(taskID1, j);
if (!taskIDToFile.containsKey(taskID2)) {
// create empty bucket, file name should be derived from taskID2
@@ -1177,11 +1179,10 @@ public final class Utilities {
removeTempOrDuplicateFiles(items, fs);
}
return result;
- }
+ }
- public static HashMap<String, FileStatus> removeTempOrDuplicateFiles(
- FileStatus[] items, FileSystem fs)
- throws IOException {
+ public static HashMap<String, FileStatus> removeTempOrDuplicateFiles(FileStatus[] items,
+ FileSystem fs) throws IOException {
if (items == null || fs == null) {
return null;
@@ -1214,12 +1215,12 @@ public final class Utilities {
long len1 = toDelete.getLen();
long len2 = taskIdToFile.get(taskId).getLen();
if (!fs.delete(toDelete.getPath(), true)) {
- throw new IOException("Unable to delete duplicate file: "
- + toDelete.getPath() + ". Existing file: " + taskIdToFile.get(taskId).getPath());
+ throw new IOException("Unable to delete duplicate file: " + toDelete.getPath()
+ + ". Existing file: " + taskIdToFile.get(taskId).getPath());
} else {
LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length "
- + len1 + ". Existing file: " + taskIdToFile.get(taskId).getPath()
- + " with length " + len2);
+ + len1 + ". Existing file: " + taskIdToFile.get(taskId).getPath() + " with length "
+ + len2);
}
}
}
@@ -1237,8 +1238,7 @@ public final class Utilities {
* @param newPaths
* Array of classpath elements
*/
- public static ClassLoader addToClassPath(ClassLoader cloader,
- String[] newPaths) throws Exception {
+ public static ClassLoader addToClassPath(ClassLoader cloader, String[] newPaths) throws Exception {
URLClassLoader loader = (URLClassLoader) cloader;
List<URL> curPath = Arrays.asList(loader.getURLs());
ArrayList<URL> newPath = new ArrayList<URL>();
@@ -1270,8 +1270,7 @@ public final class Utilities {
* @param pathsToRemove
* Array of classpath elements
*/
- public static void removeFromClassPath(String[] pathsToRemove)
- throws Exception {
+ public static void removeFromClassPath(String[] pathsToRemove) throws Exception {
Thread curThread = Thread.currentThread();
URLClassLoader loader = (URLClassLoader) curThread.getContextClassLoader();
Set<URL> newPath = new HashSet<URL>(Arrays.asList(loader.getURLs()));
@@ -1307,8 +1306,7 @@ public final class Utilities {
return names;
}
- public static List<String> getColumnNamesFromFieldSchema(
- List<FieldSchema> partCols) {
+ public static List<String> getColumnNamesFromFieldSchema(List<FieldSchema> partCols) {
List<String> names = new ArrayList<String>();
for (FieldSchema o : partCols) {
names.add(o.getName());
@@ -1344,8 +1342,8 @@ public final class Utilities {
return names;
}
- public static void validateColumnNames(List<String> colNames,
- List<String> checkCols) throws SemanticException {
+ public static void validateColumnNames(List<String> colNames, List<String> checkCols)
+ throws SemanticException {
Iterator<String> checkColsIter = checkCols.iterator();
while (checkColsIter.hasNext()) {
String toCheck = checkColsIter.next();
@@ -1365,16 +1363,15 @@ public final class Utilities {
}
/**
- * Gets the default notification interval to send progress updates to the
- * tracker. Useful for operators that may not output data for a while.
+ * Gets the default notification interval to send progress updates to the tracker. Useful for
+ * operators that may not output data for a while.
*
* @param hconf
* @return the interval in milliseconds
*/
public static int getDefaultNotificationInterval(Configuration hconf) {
int notificationInterval;
- Integer expInterval = Integer.decode(hconf
- .get("mapred.tasktracker.expiry.interval"));
+ Integer expInterval = Integer.decode(hconf.get("mapred.tasktracker.expiry.interval"));
if (expInterval != null) {
notificationInterval = expInterval.intValue() / 2;
@@ -1386,12 +1383,14 @@ public final class Utilities {
}
/**
- * Copies the storage handler properties configured for a table descriptor
- * to a runtime job configuration.
+ * Copies the storage handler properties configured for a table descriptor to a runtime job
+ * configuration.
*
- * @param tbl table descriptor from which to read
+ * @param tbl
+ * table descriptor from which to read
*
- * @param job configuration which receives configured properties
+ * @param job
+ * configuration which receives configured properties
*/
public static void copyTableJobPropertiesToConf(TableDesc tbl, JobConf job) {
Map<String, String> jobProperties = tbl.getJobProperties();
@@ -1406,14 +1405,17 @@ public final class Utilities {
/**
* Calculate the total size of input files.
*
- * @param job the hadoop job conf.
- * @param work map reduce job plan
- * @param filter filter to apply to the input paths before calculating size
+ * @param job
+ * the hadoop job conf.
+ * @param work
+ * map reduce job plan
+ * @param filter
+ * filter to apply to the input paths before calculating size
* @return the summary of all the input paths.
* @throws IOException
*/
- public static ContentSummary getInputSummary
- (Context ctx, MapredWork work, PathFilter filter) throws IOException {
+ public static ContentSummary getInputSummary(Context ctx, MapredWork work, PathFilter filter)
+ throws IOException {
long[] summary = {0, 0, 0};
@@ -1422,7 +1424,7 @@ public final class Utilities {
try {
Path p = new Path(path);
- if(filter != null && !filter.accept(p)) {
+ if (filter != null && !filter.accept(p)) {
continue;
}
@@ -1459,19 +1461,18 @@ public final class Utilities {
return true;
}
- public static List<ExecDriver> getMRTasks (List<Task<? extends Serializable>> tasks) {
- List<ExecDriver> mrTasks = new ArrayList<ExecDriver> ();
- if(tasks != null) {
+ public static List<ExecDriver> getMRTasks(List<Task<? extends Serializable>> tasks) {
+ List<ExecDriver> mrTasks = new ArrayList<ExecDriver>();
+ if (tasks != null) {
getMRTasks(tasks, mrTasks);
}
return mrTasks;
}
- private static void getMRTasks (List<Task<? extends Serializable>> tasks,
- List<ExecDriver> mrTasks) {
+ private static void getMRTasks(List<Task<? extends Serializable>> tasks, List<ExecDriver> mrTasks) {
for (Task<? extends Serializable> task : tasks) {
- if (task instanceof ExecDriver && !mrTasks.contains((ExecDriver)task)) {
- mrTasks.add((ExecDriver)task);
+ if (task instanceof ExecDriver && !mrTasks.contains((ExecDriver) task)) {
+ mrTasks.add((ExecDriver) task);
}
if (task.getDependentTasks() != null) {
@@ -1485,45 +1486,43 @@ public final class Utilities {
}
/**
- * Construct a list of full partition spec from Dynamic Partition Context and
- * the directory names corresponding to these dynamic partitions.
+ * Construct a list of full partition spec from Dynamic Partition Context and the directory names
+ * corresponding to these dynamic partitions.
*/
public static List<LinkedHashMap<String, String>> getFullDPSpecs(Configuration conf,
- DynamicPartitionCtx dpCtx)
- throws HiveException {
+ DynamicPartitionCtx dpCtx) throws HiveException {
try {
Path loadPath = new Path(dpCtx.getRootPath());
FileSystem fs = loadPath.getFileSystem(conf);
- int numDPCols = dpCtx.getNumDPCols();
- FileStatus[] status = Utilities.getFileStatusRecurse(loadPath, numDPCols, fs);
+ int numDPCols = dpCtx.getNumDPCols();
+ FileStatus[] status = Utilities.getFileStatusRecurse(loadPath, numDPCols, fs);
- if (status.length == 0) {
- LOG.warn("No partition is genereated by dynamic partitioning");
- return null;
- }
-
- // partial partition specification
- Map<String, String> partSpec = dpCtx.getPartSpec();
-
- // list of full partition specification
- List<LinkedHashMap<String, String>> fullPartSpecs =
- new ArrayList<LinkedHashMap<String, String>>();
-
- // for each dynamically created DP directory, construct a full partition spec
- // and load the partition based on that
- for (int i= 0; i < status.length; ++i) {
- // get the dynamically created directory
- Path partPath = status[i].getPath();
- assert fs.getFileStatus(partPath).isDir():
- "partitions " + partPath + " is not a directory !";
-
- // generate a full partition specification
- LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>(partSpec);
- Warehouse.makeSpecFromName(fullPartSpec, partPath);
- fullPartSpecs.add(fullPartSpec);
- }
- return fullPartSpecs;
+ if (status.length == 0) {
+ LOG.warn("No partition is genereated by dynamic partitioning");
+ return null;
+ }
+
+ // partial partition specification
+ Map<String, String> partSpec = dpCtx.getPartSpec();
+
+ // list of full partition specification
+ List<LinkedHashMap<String, String>> fullPartSpecs = new ArrayList<LinkedHashMap<String, String>>();
+
+ // for each dynamically created DP directory, construct a full partition spec
+ // and load the partition based on that
+ for (int i = 0; i < status.length; ++i) {
+ // get the dynamically created directory
+ Path partPath = status[i].getPath();
+ assert fs.getFileStatus(partPath).isDir() : "partitions " + partPath
+ + " is not a directory !";
+
+ // generate a full partition specification
+ LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>(partSpec);
+ Warehouse.makeSpecFromName(fullPartSpec, partPath);
+ fullPartSpecs.add(fullPartSpec);
+ }
+ return fullPartSpecs;
} catch (IOException e) {
throw new HiveException(e);
}
@@ -1551,9 +1550,7 @@ public final class Utilities {
columnNames.append(colInfo.getInternalName());
}
String columnNamesString = columnNames.toString();
- jobConf.set(
- Constants.LIST_COLUMNS,
- columnNamesString);
+ jobConf.set(Constants.LIST_COLUMNS, columnNamesString);
}
public static void validatePartSpec(Table tbl, Map<String, String> partSpec)
@@ -1561,13 +1558,37 @@ public final class Utilities {
List<FieldSchema> parts = tbl.getPartitionKeys();
Set<String> partCols = new HashSet<String>(parts.size());
- for (FieldSchema col: parts) {
+ for (FieldSchema col : parts) {
partCols.add(col.getName());
}
- for (String col: partSpec.keySet()) {
+ for (String col : partSpec.keySet()) {
if (!partCols.contains(col)) {
throw new SemanticException(ErrorMsg.NONEXISTPARTCOL.getMsg(col));
}
}
}
+
+ public static String suffix = ".hashtable";
+
+ public static String generatePath(String baseURI, Byte tag, String bigBucketFileName) {
+ String path = new String(baseURI + Path.SEPARATOR + "-" + tag + "-" + bigBucketFileName
+ + suffix);
+ return path;
+ }
+
+ public static String generateFileName(Byte tag, String bigBucketFileName) {
+ String fileName = new String("-" + tag + "-" + bigBucketFileName + suffix);
+ return fileName;
+ }
+
+ public static String generateTmpURI(String baseURI, String id) {
+ String tmpFileURI = new String(baseURI + Path.SEPARATOR + "HashTable-" + id);
+ return tmpFileURI;
+ }
+
+ public static String now() {
+ Calendar cal = Calendar.getInstance();
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
+ return sdf.format(cal.getTime());
+ }
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinKey.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinKey.java?rev=1034276&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinKey.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinKey.java Fri Nov 12 06:12:44 2010
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Map Join Object used for both key.
+ */
+public abstract class AbstractMapJoinKey implements Externalizable {
+
+ protected static int metadataTag = -1;
+
+ public AbstractMapJoinKey() {
+ }
+
+ @Override
+ public abstract boolean equals(Object o);
+
+ @Override
+ public abstract int hashCode();
+
+ public abstract void readExternal(ObjectInput in) throws IOException, ClassNotFoundException;
+
+ public abstract void writeExternal(ObjectOutput out) throws IOException;
+
+ public abstract boolean hasAnyNulls();
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java?rev=1034276&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java Fri Nov 12 06:12:44 2010
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+public abstract class AbstractRowContainer<Row> {
+
+ public AbstractRowContainer() {
+
+ }
+
+ public abstract void add(Row t) throws HiveException;
+
+ public abstract Row first() throws HiveException;
+
+ public abstract Row next() throws HiveException;
+
+ /**
+ * Get the number of elements in the RowContainer.
+ *
+ * @return number of elements in the RowContainer
+ */
+
+ public abstract int size();
+
+ /**
+ * Remove all elements in the RowContainer.
+ */
+
+ public abstract void clear() throws HiveException;
+}