You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2010/11/12 07:12:47 UTC

svn commit: r1034276 [2/14] - in /hive/trunk: ./ ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/hadoop/hive/ql/optimizer...

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Nov 12 06:12:44 2010
@@ -17,9 +17,8 @@
  */
 
 package org.apache.hadoop.hive.ql.exec;
-import java.io.File;
+
 import java.io.Serializable;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,15 +29,15 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.JDBMSinkOperator.JDBMSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.ql.util.JoinUtil;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
@@ -48,28 +47,24 @@ import org.apache.hadoop.util.Reflection
 /**
  * Map side Join operator implementation.
  */
-public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implements
-    Serializable {
+public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implements Serializable {
   private static final long serialVersionUID = 1L;
-  private static final Log LOG = LogFactory.getLog(MapJoinOperator.class
-      .getName());
+  private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
 
 
-  protected transient Map<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>> mapJoinTables;
+  protected transient Map<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>> mapJoinTables;
 
   private static final transient String[] FATAL_ERR_MSG = {
       null, // counter value 0 means no error
       "Mapside join size exceeds hive.mapjoin.maxsize. "
-          + "Please increase that or remove the mapjoin hint."
-      };
-
-
-
+          + "Please increase that or remove the mapjoin hint."};
 
+  protected transient Map<Byte, MapJoinRowContainer<ArrayList<Object>>> rowContainerMap;
   transient int metadataKeyTag;
   transient int[] metadataValueTag;
   transient int maxMapJoinSize;
   private int bigTableAlias;
+
   public MapJoinOperator() {
   }
 
@@ -82,8 +77,7 @@ public class MapJoinOperator extends Abs
 
     super.initializeOp(hconf);
 
-    maxMapJoinSize = HiveConf.getIntVar(hconf,
-        HiveConf.ConfVars.HIVEMAXMAPJOINSIZE);
+    maxMapJoinSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAXMAPJOINSIZE);
 
     metadataValueTag = new int[numAliases];
     for (int pos = 0; pos < numAliases; pos++) {
@@ -93,24 +87,23 @@ public class MapJoinOperator extends Abs
     metadataKeyTag = -1;
     bigTableAlias = order[posBigTable];
 
-    mapJoinTables = new HashMap<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>>();
-
+    mapJoinTables = new HashMap<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>>();
+    rowContainerMap = new HashMap<Byte, MapJoinRowContainer<ArrayList<Object>>>();
     // initialize the hash tables for other tables
     for (int pos = 0; pos < numAliases; pos++) {
       if (pos == posBigTable) {
         continue;
       }
 
-      int cacheSize = HiveConf.getIntVar(hconf,
-          HiveConf.ConfVars.HIVEMAPJOINCACHEROWS);
-      HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue> hashTable = new HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>(
-          cacheSize);
+      HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
 
       mapJoinTables.put(Byte.valueOf((byte) pos), hashTable);
+      MapJoinRowContainer<ArrayList<Object>> rowContainer = new MapJoinRowContainer<ArrayList<Object>>();
+      rowContainerMap.put(Byte.valueOf((byte) pos), rowContainer);
     }
 
-  }
 
+  }
 
   @Override
   protected void fatalErrorMessage(StringBuilder errMsg, long counterCode) {
@@ -118,113 +111,100 @@ public class MapJoinOperator extends Abs
         + FATAL_ERR_MSG[(int) counterCode]);
   }
 
-
-  public void generateMapMetaData() throws HiveException,SerDeException{
-    //generate the meta data for key
-    //index for key is -1
+  public void generateMapMetaData() throws HiveException, SerDeException {
+    // generate the meta data for key
+    // index for key is -1
     TableDesc keyTableDesc = conf.getKeyTblDesc();
-    SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(
-        keyTableDesc.getDeserializerClass(), null);
+    SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
+        null);
     keySerializer.initialize(null, keyTableDesc.getProperties());
-    MapJoinMetaData.put(Integer.valueOf(metadataKeyTag),
-        new JDBMSinkObjectCtx(
-        ObjectInspectorUtils
-        .getStandardObjectInspector(keySerializer
-        .getObjectInspector(),
-        ObjectInspectorCopyOption.WRITABLE), keySerializer,
-        keyTableDesc, hconf));
+    MapJoinMetaData.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
+        ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
+            ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, hconf));
 
-    //index for values is just alias
+    // index for values is just alias
     for (int tag = 0; tag < order.length; tag++) {
       int alias = (int) order[tag];
 
-      if(alias == this.bigTableAlias){
+      if (alias == this.bigTableAlias) {
         continue;
       }
 
 
       TableDesc valueTableDesc = conf.getValueTblDescs().get(tag);
-      SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc
-          .getDeserializerClass(), null);
+      SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(),
+          null);
       valueSerDe.initialize(null, valueTableDesc.getProperties());
 
-      MapJoinMetaData.put(Integer.valueOf(alias),
-          new JDBMSinkObjectCtx(ObjectInspectorUtils
+      MapJoinMetaData.put(Integer.valueOf(alias), new HashTableSinkObjectCtx(ObjectInspectorUtils
           .getStandardObjectInspector(valueSerDe.getObjectInspector(),
-          ObjectInspectorCopyOption.WRITABLE), valueSerDe,
-          valueTableDesc, hconf));
+              ObjectInspectorCopyOption.WRITABLE), valueSerDe, valueTableDesc, hconf));
     }
   }
 
-  private void loadJDBM() throws HiveException{
+  private void loadHashTable() throws HiveException {
     boolean localMode = HiveConf.getVar(hconf, HiveConf.ConfVars.HADOOPJT).equals("local");
-    String tmpURI =null;
-    HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue> hashtable;
+    String tmpURI = null;
+    HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashtable;
     Byte pos;
-    int alias;
 
-    String currentInputFile = HiveConf.getVar(hconf,
-        HiveConf.ConfVars.HADOOPMAPFILENAME);
+    String currentInputFile = HiveConf.getVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME);
+    LOG.info("******* Load from HashTable File: input : " + currentInputFile);
 
     String currentFileName;
 
-    if(this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
-      currentFileName= this.getFileName(currentInputFile);
+    if (this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
+      currentFileName = this.getFileName(currentInputFile);
     } else {
-      currentFileName="-";
+      currentFileName = "-";
     }
-    LOG.info("******* Filename : "+ currentFileName);
-    try{
-      if(localMode){
-        //load the jdbm file from tmp dir
-        tmpURI= this.getExecContext().getLocalWork().getTmpFileURI();
-        for(Map.Entry<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>> entry: mapJoinTables.entrySet()){
+
+    try {
+      if (localMode) {
+        LOG.info("******* Load from tmp file uri ***");
+        tmpURI = this.getExecContext().getLocalWork().getTmpFileURI();
+        for (Map.Entry<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>> entry : mapJoinTables
+            .entrySet()) {
           pos = entry.getKey();
-          hashtable=entry.getValue();
-          URI uri = new URI(tmpURI+Path.SEPARATOR+"-"+pos+"-"+currentFileName+".jdbm");
-          LOG.info("\tLoad back 1 JDBM file from tmp file uri:"+uri.toString());
-          Path path = new Path(tmpURI+Path.SEPARATOR+"-"+pos+"-"+currentFileName+".jdbm");
-          LOG.info("\tLoad back 1 JDBM file from tmp file uri:"+path.toString());
+          hashtable = entry.getValue();
+          String filePath = Utilities.generatePath(tmpURI, pos, currentFileName);
+          Path path = new Path(filePath);
+          LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path.toString());
 
-          File jdbmFile = new File(path.toUri());
-          hashtable.initilizePersistentHash(jdbmFile);
+          hashtable.initilizePersistentHash(path.toUri().getPath());
+        }
+      } else {
+
+        Path[] localFiles = DistributedCache.getLocalCacheFiles(this.hconf);
+
+        for (Map.Entry<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>> entry : mapJoinTables
+            .entrySet()) {
+          pos = entry.getKey();
+          hashtable = entry.getValue();
+          String suffix = Utilities.generateFileName(pos, currentFileName);
+          LOG.info("Looking for hashtable file with suffix: " + suffix);
+
+          boolean found = false;
+          for (int i = 0; i < localFiles.length; i++) {
+            Path path = localFiles[i];
+
+            if (path.toString().endsWith(suffix)) {
+              LOG.info("Matching suffix with cached file:" + path.toString());
+              LOG.info("\tInitializing the hashtable by cached file:" + path.toString());
+              hashtable.initilizePersistentHash(path.toString());
+              found = true;
+              LOG.info("\tLoad back 1 hashtable file from distributed cache:" + path.toString());
+              break;
+            }
+          }
+          if (!found) {
+            LOG.error("Load nothing from Distributed Cache");
+            throw new HiveException();
+          }
         }
-      }else{
-        //load the jdbm file from distributed cache
-         Path[] localFiles= DistributedCache.getLocalCacheFiles(this.hconf);
-         for(int i = 0;i<localFiles.length; i++){
-           Path path = localFiles[i];
-         }
-
-
-         for(Map.Entry<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>> entry: mapJoinTables.entrySet()){
-           pos = entry.getKey();
-           hashtable=entry.getValue();
-           String suffix="-"+pos+"-"+currentFileName+".jdbm";
-           LOG.info("Looking for jdbm file with suffix: "+suffix);
-
-           boolean found=false;
-           for(int i = 0;i<localFiles.length; i++){
-             Path path = localFiles[i];
-
-             if(path.toString().endsWith(suffix)){
-               LOG.info("Matching suffix with cached file:"+path.toString());
-               File jdbmFile = new File(path.toString());
-               LOG.info("\tInitializing the JDBM by cached file:"+path.toString());
-               hashtable.initilizePersistentHash(jdbmFile);
-               found = true;
-               LOG.info("\tLoad back 1 JDBM file from distributed cache:"+path.toString());
-               break;
-             }
-           }
-           if(!found){
-             LOG.error("Load nothing from Distributed Cache");
-             throw new HiveException();
-           }
-         }
 
       }
-    }catch (Exception e){
+    } catch (Exception e) {
       e.printStackTrace();
       LOG.error("Load Hash Table error");
 
@@ -238,48 +218,50 @@ public class MapJoinOperator extends Abs
   public void processOp(Object row, int tag) throws HiveException {
 
     try {
-      if(firstRow){
-        //generate the map metadata
+      if (firstRow) {
+        // generate the map metadata
         generateMapMetaData();
         firstRow = false;
       }
-      if(this.getExecContext().inputFileChanged()){
-        loadJDBM();
+      if (this.getExecContext().inputFileChanged()) {
+        loadHashTable();
       }
 
       // get alias
       alias = order[tag];
-      //alias = (byte)tag;
+      // alias = (byte)tag;
 
       if ((lastAlias == null) || (!lastAlias.equals(alias))) {
         nextSz = joinEmitInterval;
       }
 
       // compute keys and values as StandardObjects
-      ArrayList<Object> key = JoinUtil.computeKeys(row, joinKeys.get(alias),
+      AbstractMapJoinKey key = JoinUtil.computeMapJoinKeys(row, joinKeys.get(alias),
           joinKeysObjectInspectors.get(alias));
       ArrayList<Object> value = JoinUtil.computeValues(row, joinValues.get(alias),
-          joinValuesObjectInspectors.get(alias), joinFilters.get(alias),
-          joinFilterObjectInspectors.get(alias), noOuterJoin);
+          joinValuesObjectInspectors.get(alias), joinFilters.get(alias), joinFilterObjectInspectors
+              .get(alias), noOuterJoin);
 
 
       // Add the value to the ArrayList
-      storage.get((byte)tag).add(value);
+      storage.get((byte) tag).add(value);
 
       for (Byte pos : order) {
         if (pos.intValue() != tag) {
-          MapJoinObjectKey keyMap = new MapJoinObjectKey(metadataKeyTag, key);
-          MapJoinObjectValue o = mapJoinTables.get(pos).getMapJoinValueObject(keyMap);
+
+          MapJoinObjectValue o = mapJoinTables.get(pos).get(key);
+          MapJoinRowContainer<ArrayList<Object>> rowContainer = rowContainerMap.get(pos);
 
           // there is no join-value or join-key has all null elements
-          if (o == null || (hasAnyNulls(key))) {
+          if (o == null || key.hasAnyNulls()) {
             if (noOuterJoin) {
               storage.put(pos, emptyList);
             } else {
               storage.put(pos, dummyObjVectors[pos.intValue()]);
             }
           } else {
-            storage.put(pos, o.getObj());
+            rowContainer.reset(o.getObj());
+            storage.put(pos, rowContainer);
           }
         }
       }
@@ -288,7 +270,7 @@ public class MapJoinOperator extends Abs
       checkAndGenObject();
 
       // done with the row
-      storage.get((byte)tag).clear();
+      storage.get((byte) tag).clear();
 
       for (Byte pos : order) {
         if (pos.intValue() != tag) {
@@ -301,20 +283,22 @@ public class MapJoinOperator extends Abs
       throw new HiveException(e);
     }
   }
-  private String getFileName(String path){
-    if(path== null || path.length()==0) {
+
+  private String getFileName(String path) {
+    if (path == null || path.length() == 0) {
       return null;
     }
 
-    int last_separator = path.lastIndexOf(Path.SEPARATOR)+1;
+    int last_separator = path.lastIndexOf(Path.SEPARATOR) + 1;
     String fileName = path.substring(last_separator);
     return fileName;
 
   }
+
   @Override
   public void closeOp(boolean abort) throws HiveException {
 
-    if(mapJoinTables != null) {
+    if (mapJoinTables != null) {
       for (HashMapWrapper hashTable : mapJoinTables.values()) {
         hashTable.close();
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java Fri Nov 12 06:12:44 2010
@@ -18,26 +18,39 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -47,11 +60,14 @@ import org.apache.hadoop.util.Reflection
 public class MapredLocalTask  extends Task<MapredLocalWork> implements Serializable {
 
   private Map<String, FetchOperator> fetchOperators;
-  private File jdbmFile;
   private JobConf job;
   public static final Log l4j = LogFactory.getLog("MapredLocalTask");
-  private MapOperator mo;
-  // not sure we need this exec context; but all the operators in the work
+  static final String HADOOP_MEM_KEY = "HADOOP_HEAPSIZE";
+  static final String HADOOP_OPTS_KEY = "HADOOP_OPTS";
+  static final String[] HIVE_SYS_PROP = {"build.dir", "build.dir.hive"};
+  public static MemoryMXBean memoryMXBean;
+
+    // not sure we need this exec context; but all the operators in the work
   // will pass this context throught
   private final ExecMapperContext execContext = new ExecMapperContext();
 
@@ -59,6 +75,13 @@ public class MapredLocalTask  extends Ta
     super();
   }
 
+  public MapredLocalTask(MapredLocalWork plan, JobConf job, boolean isSilent) throws HiveException {
+    setWork(plan);
+    this.job = job;
+    LOG = LogFactory.getLog(this.getClass().getName());
+    console = new LogHelper(LOG, isSilent);
+  }
+
   @Override
   public void initialize(HiveConf conf, QueryPlan queryPlan,
       DriverContext driverContext) {
@@ -66,12 +89,159 @@ public class MapredLocalTask  extends Ta
     job = new JobConf(conf, ExecDriver.class);
   }
 
+  public static String now(){
+    Calendar cal = Calendar.getInstance();
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss");
+    return  sdf.format(cal.getTime());
+  }
+
+
+
   @Override
-public int execute(DriverContext driverContext){
+  public int execute(DriverContext driverContext){
+    try{
+      //generate the cmd line to run in the child jvm
+      //String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
+      Context ctx = driverContext.getCtx();
+      String hiveJar = conf.getJar();
+
+      String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
+      String libJarsOption;
+
+      // write out the plan to a local file
+      Path planPath = new Path(ctx.getLocalTmpFileURI(), "plan.xml");
+      OutputStream out = FileSystem.getLocal(conf).create(planPath);
+      MapredLocalWork plan = getWork();
+      LOG.info("Generating plan file " + planPath.toString());
+      Utilities.serializeMapRedLocalWork(plan, out);
+
+      String isSilent = "true".equalsIgnoreCase(System
+          .getProperty("test.silent")) ? "-nolog" : "";
+
+      String jarCmd;
+
+      jarCmd = hiveJar + " " + ExecDriver.class.getName() ;
+
+      String hiveConfArgs = ExecDriver.generateCmdLine(conf);
+      String cmdLine = hadoopExec + " jar " + jarCmd + " -localtask -plan "
+          + planPath.toString() + " " + isSilent + " " + hiveConfArgs;
+
+      String workDir = (new File(".")).getCanonicalPath();
+      String files = ExecDriver.getResourceFiles(conf, SessionState.ResourceType.FILE);
+
+      if (!files.isEmpty()) {
+        cmdLine = cmdLine + " -files " + files;
+
+        workDir = (new Path(ctx.getLocalTmpFileURI())).toUri().getPath();
+
+        if (! (new File(workDir)).mkdir()) {
+          throw new IOException ("Cannot create tmp working dir: " + workDir);
+        }
+
+        for (String f: StringUtils.split(files, ',')) {
+          Path p = new Path(f);
+          String target = p.toUri().getPath();
+          String link = workDir + Path.SEPARATOR + p.getName();
+          if (FileUtil.symLink(target, link) != 0) {
+            throw new IOException ("Cannot link to added file: " + target + " from: " + link);
+          }
+        }
+      }
+
+      LOG.info("Executing: " + cmdLine);
+      Process executor = null;
+
+      // Inherit Java system variables
+      String hadoopOpts;
+      StringBuilder sb = new StringBuilder();
+      Properties p = System.getProperties();
+      for (String element : HIVE_SYS_PROP) {
+        if (p.containsKey(element)) {
+          sb.append(" -D" + element + "=" + p.getProperty(element));
+        }
+      }
+      hadoopOpts = sb.toString();
+      // Inherit the environment variables
+      String[] env;
+      Map<String, String> variables = new HashMap(System.getenv());
+      // The user can specify the hadoop memory
+
+      //if ("local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT))) {
+        // if we are running in local mode - then the amount of memory used
+        // by the child jvm can no longer default to the memory used by the
+        // parent jvm
+        //int hadoopMem = conf.getIntVar(HiveConf.ConfVars.HIVEHADOOPMAXMEM);
+      int hadoopMem= conf.getIntVar(HiveConf.ConfVars.HIVEHADOOPMAXMEM);;
+      if (hadoopMem == 0) {
+        // remove env var that would default child jvm to use parent's memory
+        // as default. child jvm would use default memory for a hadoop client
+        variables.remove(HADOOP_MEM_KEY);
+      } else {
+        // user specified the memory for local mode hadoop run
+        console.printInfo(" set heap size\t"+hadoopMem+"MB");
+        variables.put(HADOOP_MEM_KEY, String.valueOf(hadoopMem));
+      }
+      //} else {
+        // nothing to do - we are not running in local mode - only submitting
+        // the job via a child process. in this case it's appropriate that the
+        // child jvm use the same memory as the parent jvm
+
+      //}
+
+      if (variables.containsKey(HADOOP_OPTS_KEY)) {
+        variables.put(HADOOP_OPTS_KEY, variables.get(HADOOP_OPTS_KEY)
+            + hadoopOpts);
+      } else {
+        variables.put(HADOOP_OPTS_KEY, hadoopOpts);
+      }
+      env = new String[variables.size()];
+      int pos = 0;
+      for (Map.Entry<String, String> entry : variables.entrySet()) {
+        String name = entry.getKey();
+        String value = entry.getValue();
+        env[pos++] = name + "=" + value;
+      }
+
+      // Run ExecDriver in another JVM
+      executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir));
+
+      StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(),
+          null, System.out);
+      StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(),
+          null, System.err);
+
+      outPrinter.start();
+      errPrinter.start();
+
+      int exitVal = executor.waitFor();
+
+      if (exitVal != 0) {
+        LOG.error("Execution failed with exit status: " + exitVal);
+        console.printError("Mapred Local Task Failed. Give up the map join stragery");
+      } else {
+        LOG.info("Execution completed successfully");
+        console.printInfo("Mapred Local Task Running Successfully . Keep using map join stragery");
+      }
+
+      return exitVal;
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOG.error("Exception: " + e.getMessage());
+      return (1);
+    }
+  }
+
+
+
+  public int executeFromChildJVM(DriverContext driverContext){
+
     // check the local work
     if(work == null){
       return -1;
     }
+    memoryMXBean = ManagementFactory.getMemoryMXBean();
+    console.printInfo(Utilities.now()+"\tStarting to luaunch local task to process map join ");
+    console.printInfo("\tmaximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
     fetchOperators = new HashMap<String, FetchOperator>();
     Map<FetchOperator, JobConf> fetchOpJobConfMap = new HashMap<FetchOperator, JobConf>();
     execContext.setJc(job);
@@ -92,14 +262,19 @@ public int execute(DriverContext driverC
       }else{
         startForward(inputFileChangeSenstive,null);
       }
+      console.printInfo(now()+"\tEnd of local task ");
     } catch (Throwable e) {
       if (e instanceof OutOfMemoryError) {
         // Don't create a new object if we are already out of memory
-        l4j.error("Out of Merror Error");
+        l4j.error("Out of Memory Error");
+        console.printError("[Warning] Small table is too large to put into memory");
+        return 2;
       } else {
         l4j.error("Hive Runtime Error: Map local work failed");
         e.printStackTrace();
       }
+    }finally{
+      console.printInfo(Utilities.now()+"\tFinish running local task");
     }
     return 0;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Fri Nov 12 06:12:44 2010
@@ -28,8 +28,8 @@ import org.apache.hadoop.hive.ql.plan.Fi
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.ForwardDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.JDBMDummyDesc;
-import org.apache.hadoop.hive.ql.plan.JDBMSinkDesc;
+import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
+import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.LateralViewForwardDesc;
 import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
@@ -87,10 +87,10 @@ public final class OperatorFactory {
         LateralViewJoinOperator.class));
     opvec.add(new OpTuple<LateralViewForwardDesc>(LateralViewForwardDesc.class,
         LateralViewForwardOperator.class));
-    opvec.add(new OpTuple<JDBMDummyDesc>(JDBMDummyDesc.class,
-        JDBMDummyOperator.class));
-    opvec.add(new OpTuple<JDBMSinkDesc>(JDBMSinkDesc.class,
-        JDBMSinkOperator.class));
+    opvec.add(new OpTuple<HashTableDummyDesc>(HashTableDummyDesc.class,
+        HashTableDummyOperator.class));
+    opvec.add(new OpTuple<HashTableSinkDesc>(HashTableSinkDesc.class,
+        HashTableSinkOperator.class));
   }
 
   public static <T extends Serializable> Operator<T> get(Class<T> opClass) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Fri Nov 12 06:12:44 2010
@@ -37,7 +37,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.ql.util.JoinUtil;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java Fri Nov 12 06:12:44 2010
@@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.exec.pe
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.ql.util.JoinUtil;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -164,7 +163,7 @@ public class SkewJoinHandler {
     // reset rowcontainer's serde, objectinspector, and tableDesc.
     for (int i = 0; i < numAliases; i++) {
       Byte alias = conf.getTagOrder()[i];
-      RowContainer<ArrayList<Object>> rc = joinOp.storage.get(Byte
+      RowContainer<ArrayList<Object>> rc = (RowContainer)joinOp.storage.get(Byte
           .valueOf((byte) i));
       if (rc != null) {
         rc.setSerDe(tblSerializers.get((byte) i), skewKeysTableObjectInspector
@@ -178,7 +177,7 @@ public class SkewJoinHandler {
     if (skewKeyInCurrentGroup) {
 
       String specPath = conf.getBigKeysDirMap().get((byte) currBigKeyTag);
-      RowContainer<ArrayList<Object>> bigKey = joinOp.storage.get(Byte
+      RowContainer<ArrayList<Object>> bigKey = (RowContainer)joinOp.storage.get(Byte
           .valueOf((byte) currBigKeyTag));
       Path outputPath = getOperatorOutputPath(specPath);
       FileSystem destFs = outputPath.getFileSystem(hconf);
@@ -188,7 +187,7 @@ public class SkewJoinHandler {
         if (((byte) i) == currBigKeyTag) {
           continue;
         }
-        RowContainer<ArrayList<Object>> values = joinOp.storage.get(Byte
+        RowContainer<ArrayList<Object>> values = (RowContainer)joinOp.storage.get(Byte
             .valueOf((byte) i));
         if (values != null) {
           specPath = conf.getSmallKeysDirMap().get((byte) currBigKeyTag).get(
@@ -216,7 +215,7 @@ public class SkewJoinHandler {
       skewKeyInCurrentGroup = false;
 
       for (int i = 0; i < numAliases; i++) {
-        RowContainer<ArrayList<Object>> rc = joinOp.storage.get(Byte
+        RowContainer<ArrayList<Object>> rc = (RowContainer)joinOp.storage.get(Byte
             .valueOf((byte) i));
         if (rc != null) {
           rc.setKeyObject(dummyKey);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Nov 12 06:12:44 2010
@@ -43,8 +43,10 @@ import java.io.UnsupportedEncodingExcept
 import java.net.URI;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Calendar;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -88,11 +90,12 @@ import org.apache.hadoop.hive.ql.parse.S
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.serde.Constants;
@@ -193,14 +196,13 @@ public final class Utilities {
   }
 
   /**
-   * Java 1.5 workaround. From
-   * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5015403
+   * Java 1.5 workaround. From http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5015403
    */
   public static class EnumDelegate extends DefaultPersistenceDelegate {
     @Override
     protected Expression instantiate(Object oldInstance, Encoder out) {
-      return new Expression(Enum.class, "valueOf", new Object[] {
-          oldInstance.getClass(), ((Enum<?>) oldInstance).name()});
+      return new Expression(Enum.class, "valueOf", new Object[] {oldInstance.getClass(),
+          ((Enum<?>) oldInstance).name()});
     }
 
     @Override
@@ -212,24 +214,26 @@ public final class Utilities {
   public static class MapDelegate extends DefaultPersistenceDelegate {
     @Override
     protected Expression instantiate(Object oldInstance, Encoder out) {
-      Map oldMap = (Map)oldInstance;
+      Map oldMap = (Map) oldInstance;
       HashMap newMap = new HashMap(oldMap);
       return new Expression(newMap, HashMap.class, "new", new Object[] {});
     }
+
     @Override
     protected boolean mutatesTo(Object oldInstance, Object newInstance) {
       return false;
     }
+
     @Override
     protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
-      java.util.Collection oldO = (java.util.Collection)oldInstance;
-      java.util.Collection newO = (java.util.Collection)newInstance;
+      java.util.Collection oldO = (java.util.Collection) oldInstance;
+      java.util.Collection newO = (java.util.Collection) newInstance;
 
       if (newO.size() != 0) {
-        out.writeStatement(new Statement(oldInstance, "clear", new Object[]{}));
+        out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
       }
       for (Iterator i = oldO.iterator(); i.hasNext();) {
-        out.writeStatement(new Statement(oldInstance, "add", new Object[]{i.next()}));
+        out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
       }
     }
   }
@@ -237,24 +241,26 @@ public final class Utilities {
   public static class SetDelegate extends DefaultPersistenceDelegate {
     @Override
     protected Expression instantiate(Object oldInstance, Encoder out) {
-      Set oldSet = (Set)oldInstance;
+      Set oldSet = (Set) oldInstance;
       HashSet newSet = new HashSet(oldSet);
       return new Expression(newSet, HashSet.class, "new", new Object[] {});
     }
+
     @Override
     protected boolean mutatesTo(Object oldInstance, Object newInstance) {
       return false;
     }
+
     @Override
     protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
-      java.util.Collection oldO = (java.util.Collection)oldInstance;
-      java.util.Collection newO = (java.util.Collection)newInstance;
+      java.util.Collection oldO = (java.util.Collection) oldInstance;
+      java.util.Collection newO = (java.util.Collection) newInstance;
 
       if (newO.size() != 0) {
-        out.writeStatement(new Statement(oldInstance, "clear", new Object[]{}));
+        out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
       }
       for (Iterator i = oldO.iterator(); i.hasNext();) {
-        out.writeStatement(new Statement(oldInstance, "add", new Object[]{i.next()}));
+        out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
       }
     }
 
@@ -263,24 +269,26 @@ public final class Utilities {
   public static class ListDelegate extends DefaultPersistenceDelegate {
     @Override
     protected Expression instantiate(Object oldInstance, Encoder out) {
-      List oldList = (List)oldInstance;
+      List oldList = (List) oldInstance;
       ArrayList newList = new ArrayList(oldList);
       return new Expression(newList, ArrayList.class, "new", new Object[] {});
     }
+
     @Override
     protected boolean mutatesTo(Object oldInstance, Object newInstance) {
       return false;
     }
+
     @Override
     protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
-      java.util.Collection oldO = (java.util.Collection)oldInstance;
-      java.util.Collection newO = (java.util.Collection)newInstance;
+      java.util.Collection oldO = (java.util.Collection) oldInstance;
+      java.util.Collection newO = (java.util.Collection) newInstance;
 
       if (newO.size() != 0) {
-        out.writeStatement(new Statement(oldInstance, "clear", new Object[]{}));
+        out.writeStatement(new Statement(oldInstance, "clear", new Object[] {}));
       }
       for (Iterator i = oldO.iterator(); i.hasNext();) {
-        out.writeStatement(new Statement(oldInstance, "add", new Object[]{i.next()}));
+        out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()}));
       }
     }
 
@@ -297,12 +305,12 @@ public final class Utilities {
       // Serialize the plan to the default hdfs instance
       // Except for hadoop local mode execution where we should be
       // able to get the plan directly from the cache
-      if(!HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local")) {
+      if (!HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local")) {
         // use the default file system of the job
         FileSystem fs = planPath.getFileSystem(job);
         FSDataOutputStream out = fs.create(planPath);
         serializeMapRedWork(w, out);
-        
+
         // Set up distributed cache
         DistributedCache.createSymlink(job);
         String uriWithLink = planPath.toUri().toString() + "#HIVE_PLAN" + jobID;
@@ -310,7 +318,7 @@ public final class Utilities {
 
         // set replication of the plan file to a high number. we use the same
         // replication factor as used by the hadoop jobclient for job.xml etc.
-        short replication = (short)job.getInt("mapred.submit.replication", 10);
+        short replication = (short) job.getInt("mapred.submit.replication", 10);
         fs.setReplication(planPath, replication);
       }
 
@@ -324,7 +332,7 @@ public final class Utilities {
   }
 
   public static String getHiveJobID(Configuration job) {
-    String planPath= HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
+    String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
     if (planPath != null) {
       return (new Path(planPath)).getName();
     }
@@ -346,17 +354,15 @@ public final class Utilities {
     }
   }
 
-  public static ExprNodeDesc deserializeExpression(
-    String s, Configuration conf) {
-    byte [] bytes;
+  public static ExprNodeDesc deserializeExpression(String s, Configuration conf) {
+    byte[] bytes;
     try {
       bytes = s.getBytes("UTF-8");
     } catch (UnsupportedEncodingException ex) {
       throw new RuntimeException("UTF-8 support required", ex);
     }
     ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-    XMLDecoder decoder = new XMLDecoder(
-      bais, null, null, conf.getClassLoader());
+    XMLDecoder decoder = new XMLDecoder(bais, null, null, conf.getClassLoader());
     try {
       ExprNodeDesc expr = (ExprNodeDesc) decoder.readObject();
       return expr;
@@ -368,36 +374,29 @@ public final class Utilities {
   /**
    * Serialize a single Task.
    */
-  public static void serializeTasks(Task<? extends Serializable> t,
-      OutputStream out) {
+  public static void serializeTasks(Task<? extends Serializable> t, OutputStream out) {
     XMLEncoder e = new XMLEncoder(out);
     // workaround for java 1.5
     e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
     e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
-    e.setPersistenceDelegate(Operator.ProgressCounter.class,
-        new EnumDelegate());
+    e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate());
 
     e.writeObject(t);
     e.close();
   }
 
-  public static   class CollectionPersistenceDelegate extends DefaultPersistenceDelegate {
+  public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate {
     @Override
     protected Expression instantiate(Object oldInstance, Encoder out) {
-      return new Expression(oldInstance,
-                            oldInstance.getClass(),
-                            "new",
-                            null);
+      return new Expression(oldInstance, oldInstance.getClass(), "new", null);
     }
 
     @Override
-    protected void initialize(Class type, Object oldInstance, Object newInstance,
-                              Encoder out) {
+    protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) {
       Iterator ite = ((Collection) oldInstance).iterator();
       while (ite.hasNext()) {
-          out.writeStatement(new Statement(oldInstance, "add",
-                                           new Object[] { ite.next() }));
-        }
+        out.writeStatement(new Statement(oldInstance, "add", new Object[] {ite.next()}));
+      }
     }
   }
 
@@ -415,8 +414,7 @@ public final class Utilities {
     // workaround for java 1.5
     e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
     e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
-    e.setPersistenceDelegate(Operator.ProgressCounter.class,
-        new EnumDelegate());
+    e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate());
 
     e.setPersistenceDelegate(org.datanucleus.store.types.sco.backed.Map.class, new MapDelegate());
     e.setPersistenceDelegate(org.datanucleus.store.types.sco.backed.List.class, new ListDelegate());
@@ -428,8 +426,7 @@ public final class Utilities {
   /**
    * Deserialize the whole query plan.
    */
-  public static QueryPlan deserializeQueryPlan(InputStream in,
-      Configuration conf) {
+  public static QueryPlan deserializeQueryPlan(InputStream in, Configuration conf) {
     XMLDecoder d = new XMLDecoder(in, null, null, conf.getClassLoader());
     QueryPlan ret = (QueryPlan) d.readObject();
     d.close();
@@ -437,9 +434,8 @@ public final class Utilities {
   }
 
   /**
-   * Serialize the mapredWork object to an output stream. DO NOT use this to
-   * write to standard output since it closes the output stream.
-   * DO USE mapredWork.toXML() instead.
+   * Serialize the mapredWork object to an output stream. DO NOT use this to write to standard
+   * output since it closes the output stream. DO USE mapredWork.toXML() instead.
    */
   public static void serializeMapRedWork(MapredWork w, OutputStream out) {
     XMLEncoder e = new XMLEncoder(out);
@@ -450,8 +446,7 @@ public final class Utilities {
     e.close();
   }
 
-  public static MapredWork deserializeMapRedWork(InputStream in,
-      Configuration conf) {
+  public static MapredWork deserializeMapRedWork(InputStream in, Configuration conf) {
     XMLDecoder d = new XMLDecoder(in, null, null, conf.getClassLoader());
     MapredWork ret = (MapredWork) d.readObject();
     d.close();
@@ -459,6 +454,26 @@ public final class Utilities {
   }
 
   /**
+   * Serialize the mapredLocalWork object to an output stream. DO NOT use this to write to standard
+   * output since it closes the output stream. DO USE mapredWork.toXML() instead.
+   */
+  public static void serializeMapRedLocalWork(MapredLocalWork w, OutputStream out) {
+    XMLEncoder e = new XMLEncoder(out);
+    // workaround for java 1.5
+    e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
+    e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
+    e.writeObject(w);
+    e.close();
+  }
+
+  public static MapredLocalWork deserializeMapRedLocalWork(InputStream in, Configuration conf) {
+    XMLDecoder d = new XMLDecoder(in, null, null, conf.getClassLoader());
+    MapredLocalWork ret = (MapredLocalWork) d.readObject();
+    d.close();
+    return (ret);
+  }
+
+  /**
    * Tuple.
    *
    * @param <T>
@@ -507,19 +522,17 @@ public final class Utilities {
   public static Random randGen = new Random();
 
   /**
-   * Gets the task id if we are running as a Hadoop job. Gets a random number
-   * otherwise.
+   * Gets the task id if we are running as a Hadoop job. Gets a random number otherwise.
    */
   public static String getTaskId(Configuration hconf) {
     String taskid = (hconf == null) ? null : hconf.get("mapred.task.id");
     if ((taskid == null) || taskid.equals("")) {
       return ("" + Math.abs(randGen.nextInt()));
     } else {
-       /* extract the task and attempt id from the hadoop taskid.
-          in version 17 the leading component was 'task_'. thereafter
-          the leading component is 'attempt_'. in 17 - hadoop also
-          seems to have used _map_ and _reduce_ to denote map/reduce
-          task types
+      /*
+       * extract the task and attempt id from the hadoop taskid. in version 17 the leading component
+       * was 'task_'. thereafter the leading component is 'attempt_'. in 17 - hadoop also seems to
+       * have used _map_ and _reduce_ to denote map/reduce task types
        */
       String ret = taskid.replaceAll(".*_[mr]_", "").replaceAll(".*_(map|reduce)_", "");
       return (ret);
@@ -587,29 +600,26 @@ public final class Utilities {
   }
 
   public static TableDesc getTableDesc(Table tbl) {
-    return (new TableDesc(tbl.getDeserializer().getClass(), tbl
-        .getInputFormatClass(), tbl.getOutputFormatClass(), tbl.getSchema()));
+    return (new TableDesc(tbl.getDeserializer().getClass(), tbl.getInputFormatClass(), tbl
+        .getOutputFormatClass(), tbl.getSchema()));
   }
 
   // column names and column types are all delimited by comma
   public static TableDesc getTableDesc(String cols, String colTypes) {
     return (new TableDesc(LazySimpleSerDe.class, SequenceFileInputFormat.class,
         HiveSequenceFileOutputFormat.class, Utilities.makeProperties(
-        org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, ""
-        + Utilities.ctrlaCode,
-        org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols,
-        org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes)));
+            org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode,
+            org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols,
+            org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes)));
   }
 
-  public static PartitionDesc getPartitionDesc(Partition part)
-      throws HiveException {
+  public static PartitionDesc getPartitionDesc(Partition part) throws HiveException {
     return (new PartitionDesc(part));
   }
 
-  public static void addMapWork(MapredWork mr, Table tbl, String alias,
-      Operator<?> work) {
-    mr.addMapWork(tbl.getDataLocation().getPath(), alias, work,
-        new PartitionDesc(getTableDesc(tbl), null));
+  public static void addMapWork(MapredWork mr, Table tbl, String alias, Operator<?> work) {
+    mr.addMapWork(tbl.getDataLocation().getPath(), alias, work, new PartitionDesc(
+        getTableDesc(tbl), null));
   }
 
   private static String getOpTreeSkel_helper(Operator<?> op, String indent) {
@@ -641,8 +651,8 @@ public final class Utilities {
     return Character.isWhitespace((char) c);
   }
 
-  public static boolean contentsEqual(InputStream is1, InputStream is2,
-      boolean ignoreWhitespace) throws IOException {
+  public static boolean contentsEqual(InputStream is1, InputStream is2, boolean ignoreWhitespace)
+      throws IOException {
     try {
       if ((is1 == is2) || (is1 == null && is2 == null)) {
         return true;
@@ -710,8 +720,7 @@ public final class Utilities {
     EOF, TERMINATED
   }
 
-  public static StreamStatus readColumn(DataInput in, OutputStream out)
-      throws IOException {
+  public static StreamStatus readColumn(DataInput in, OutputStream out) throws IOException {
 
     while (true) {
       int b;
@@ -731,8 +740,8 @@ public final class Utilities {
   }
 
   /**
-   * Convert an output stream to a compressed output stream based on codecs and
-   * compression options specified in the Job Configuration.
+   * Convert an output stream to a compressed output stream based on codecs and compression options
+   * specified in the Job Configuration.
    *
    * @param jc
    *          Job Configuration
@@ -747,9 +756,8 @@ public final class Utilities {
   }
 
   /**
-   * Convert an output stream to a compressed output stream based on codecs
-   * codecs in the Job Configuration. Caller specifies directly whether file is
-   * compressed or not
+   * Convert an output stream to a compressed output stream based on codecs codecs in the Job
+   * Configuration. Caller specifies directly whether file is compressed or not
    *
    * @param jc
    *          Job Configuration
@@ -759,11 +767,11 @@ public final class Utilities {
    *          whether the output stream needs to be compressed or not
    * @return compressed output stream
    */
-  public static OutputStream createCompressedStream(JobConf jc,
-      OutputStream out, boolean isCompressed) throws IOException {
+  public static OutputStream createCompressedStream(JobConf jc, OutputStream out,
+      boolean isCompressed) throws IOException {
     if (isCompressed) {
-      Class<? extends CompressionCodec> codecClass = FileOutputFormat
-          .getOutputCompressorClass(jc, DefaultCodec.class);
+      Class<? extends CompressionCodec> codecClass = FileOutputFormat.getOutputCompressorClass(jc,
+          DefaultCodec.class);
       CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
       return codec.createOutputStream(out);
     } else {
@@ -772,8 +780,8 @@ public final class Utilities {
   }
 
   /**
-   * Based on compression option and configured output codec - get extension for
-   * output file. This is only required for text files - not sequencefiles
+   * Based on compression option and configured output codec - get extension for output file. This
+   * is only required for text files - not sequencefiles
    *
    * @param jc
    *          Job Configuration
@@ -785,8 +793,8 @@ public final class Utilities {
     if (!isCompressed) {
       return "";
     } else {
-      Class<? extends CompressionCodec> codecClass = FileOutputFormat
-          .getOutputCompressorClass(jc, DefaultCodec.class);
+      Class<? extends CompressionCodec> codecClass = FileOutputFormat.getOutputCompressorClass(jc,
+          DefaultCodec.class);
       CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
       return codec.getDefaultExtension();
     }
@@ -807,17 +815,15 @@ public final class Utilities {
    *          Java Class for value
    * @return output stream over the created sequencefile
    */
-  public static SequenceFile.Writer createSequenceWriter(JobConf jc,
-      FileSystem fs, Path file, Class<?> keyClass, Class<?> valClass)
-      throws IOException {
+  public static SequenceFile.Writer createSequenceWriter(JobConf jc, FileSystem fs, Path file,
+      Class<?> keyClass, Class<?> valClass) throws IOException {
     boolean isCompressed = FileOutputFormat.getCompressOutput(jc);
     return createSequenceWriter(jc, fs, file, keyClass, valClass, isCompressed);
   }
 
   /**
-   * Create a sequencefile output stream based on job configuration Uses user
-   * supplied compression flag (rather than obtaining it from the Job
-   * Configuration).
+   * Create a sequencefile output stream based on job configuration Uses user supplied compression
+   * flag (rather than obtaining it from the Job Configuration).
    *
    * @param jc
    *          Job configuration
@@ -831,26 +837,23 @@ public final class Utilities {
    *          Java Class for value
    * @return output stream over the created sequencefile
    */
-  public static SequenceFile.Writer createSequenceWriter(JobConf jc,
-      FileSystem fs, Path file, Class<?> keyClass, Class<?> valClass,
-      boolean isCompressed) throws IOException {
+  public static SequenceFile.Writer createSequenceWriter(JobConf jc, FileSystem fs, Path file,
+      Class<?> keyClass, Class<?> valClass, boolean isCompressed) throws IOException {
     CompressionCodec codec = null;
     CompressionType compressionType = CompressionType.NONE;
     Class codecClass = null;
     if (isCompressed) {
       compressionType = SequenceFileOutputFormat.getOutputCompressionType(jc);
-      codecClass = FileOutputFormat.getOutputCompressorClass(jc,
-          DefaultCodec.class);
+      codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class);
       codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
     }
-    return (SequenceFile.createWriter(fs, jc, file, keyClass, valClass,
-        compressionType, codec));
+    return (SequenceFile.createWriter(fs, jc, file, keyClass, valClass, compressionType, codec));
 
   }
 
   /**
-   * Create a RCFile output stream based on job configuration Uses user supplied
-   * compression flag (rather than obtaining it from the Job Configuration).
+   * Create a RCFile output stream based on job configuration Uses user supplied compression flag
+   * (rather than obtaining it from the Job Configuration).
    *
    * @param jc
    *          Job configuration
@@ -860,13 +863,12 @@ public final class Utilities {
    *          Path to be created
    * @return output stream over the created rcfile
    */
-  public static RCFile.Writer createRCFileWriter(JobConf jc, FileSystem fs,
-      Path file, boolean isCompressed) throws IOException {
+  public static RCFile.Writer createRCFileWriter(JobConf jc, FileSystem fs, Path file,
+      boolean isCompressed) throws IOException {
     CompressionCodec codec = null;
     Class<?> codecClass = null;
     if (isCompressed) {
-      codecClass = FileOutputFormat.getOutputCompressorClass(jc,
-          DefaultCodec.class);
+      codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class);
       codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
     }
     return new RCFile.Writer(fs, jc, file, null, codec);
@@ -875,8 +877,7 @@ public final class Utilities {
   /**
    * Shamelessly cloned from GenericOptionsParser.
    */
-  public static String realFile(String newFile, Configuration conf)
-      throws IOException {
+  public static String realFile(String newFile, Configuration conf) throws IOException {
     Path path = new Path(newFile);
     URI pathURI = path.toUri();
     FileSystem fs;
@@ -897,8 +898,7 @@ public final class Utilities {
     }
     String file = path.makeQualified(fs).toString();
     // For compatibility with hadoop 0.17, change file:/a/b/c to file:///a/b/c
-    if (StringUtils.startsWith(file, "file:/")
-        && !StringUtils.startsWith(file, "file:///")) {
+    if (StringUtils.startsWith(file, "file:/") && !StringUtils.startsWith(file, "file:///")) {
       file = "file:///" + file.substring("file:/".length());
     }
     return file;
@@ -950,9 +950,8 @@ public final class Utilities {
   }
 
   /**
-   * Rename src to dst, or in the case dst already exists, move files in src to
-   * dst. If there is an existing file with the same name, the new file's name
-   * will be appended with "_1", "_2", etc.
+   * Rename src to dst, or in the case dst already exists, move files in src to dst. If there is an
+   * existing file with the same name, the new file's name will be appended with "_1", "_2", etc.
    *
    * @param fs
    *          the FileSystem where src and dst are on.
@@ -962,17 +961,15 @@ public final class Utilities {
    *          the target directory
    * @throws IOException
    */
-  public static void rename(FileSystem fs, Path src, Path dst)
-      throws IOException, HiveException {
+  public static void rename(FileSystem fs, Path src, Path dst) throws IOException, HiveException {
     if (!fs.rename(src, dst)) {
       throw new HiveException("Unable to move: " + src + " to: " + dst);
     }
   }
 
   /**
-   * Rename src to dst, or in the case dst already exists, move files in src to
-   * dst. If there is an existing file with the same name, the new file's name
-   * will be appended with "_1", "_2", etc.
+   * Rename src to dst, or in the case dst already exists, move files in src to dst. If there is an
+   * existing file with the same name, the new file's name will be appended with "_1", "_2", etc.
    *
    * @param fs
    *          the FileSystem where src and dst are on.
@@ -982,8 +979,8 @@ public final class Utilities {
    *          the target directory
    * @throws IOException
    */
-  public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst)
-      throws IOException, HiveException {
+  public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws IOException,
+      HiveException {
     if (!fs.exists(dst)) {
       if (!fs.rename(src, dst)) {
         throw new HiveException("Unable to move: " + src + " to: " + dst);
@@ -1010,18 +1007,18 @@ public final class Utilities {
   }
 
   /**
-   * The first group will contain the task id. The second group is the optional
-   * extension. The file name looks like: "0_0" or "0_0.gz". There may be a leading
-   * prefix (tmp_). Since getTaskId() can return an integer only - this should match
-   * a pure integer as well
+   * The first group will contain the task id. The second group is the optional extension. The file
+   * name looks like: "0_0" or "0_0.gz". There may be a leading prefix (tmp_). Since getTaskId() can
+   * return an integer only - this should match a pure integer as well
    */
   private static Pattern fileNameTaskIdRegex = Pattern.compile("^.*?([0-9]+)(_[0-9])?(\\..*)?$");
 
   /**
-   * Get the task id from the filename.
-   * It is assumed that the filename is derived from the output of getTaskId
+   * Get the task id from the filename. It is assumed that the filename is derived from the output
+   * of getTaskId
    *
-   * @param filename filename to extract taskid from
+   * @param filename
+   *          filename to extract taskid from
    */
   public static String getTaskIdFromFilename(String filename) {
     String taskId = filename;
@@ -1032,8 +1029,8 @@ public final class Utilities {
 
     Matcher m = fileNameTaskIdRegex.matcher(taskId);
     if (!m.matches()) {
-      LOG.warn("Unable to get task id from file name: " + filename
-               + ". Using last component" + taskId + " as task id.");
+      LOG.warn("Unable to get task id from file name: " + filename + ". Using last component"
+          + taskId + " as task id.");
     } else {
       taskId = m.group(1);
     }
@@ -1042,17 +1039,16 @@ public final class Utilities {
   }
 
   /**
-   * Replace the task id from the filename.
-   * It is assumed that the filename is derived from the output of getTaskId
+   * Replace the task id from the filename. It is assumed that the filename is derived from the
+   * output of getTaskId
    *
-   * @param filename filename to replace taskid
-   * "0_0" or "0_0.gz" by 33 to
-   * "33_0" or "33_0.gz"
+   * @param filename
+   *          filename to replace taskid "0_0" or "0_0.gz" by 33 to "33_0" or "33_0.gz"
    */
   public static String replaceTaskIdFromFilename(String filename, int bucketNum) {
     String taskId = getTaskIdFromFilename(filename);
     String newTaskId = replaceTaskId(taskId, bucketNum);
-    String ret =  replaceTaskIdFromFilename(filename, taskId, newTaskId);
+    String ret = replaceTaskIdFromFilename(filename, taskId, newTaskId);
     return (ret);
   }
 
@@ -1062,21 +1058,22 @@ public final class Utilities {
     int taskIdLen = taskId.length();
     StringBuffer s = new StringBuffer();
     for (int i = 0; i < taskIdLen - bucketNumLen; i++) {
-        s.append("0");
+      s.append("0");
     }
     return s.toString() + strBucketNum;
   }
 
   /**
-   * Replace the oldTaskId appearing in the filename by the newTaskId.
-   * The string oldTaskId could appear multiple times, we should only replace the last one.
+   * Replace the oldTaskId appearing in the filename by the newTaskId. The string oldTaskId could
+   * appear multiple times, we should only replace the last one.
+   *
    * @param filename
    * @param oldTaskId
    * @param newTaskId
    * @return
    */
-  private static String replaceTaskIdFromFilename(String filename,
-      String oldTaskId, String newTaskId) {
+  private static String replaceTaskIdFromFilename(String filename, String oldTaskId,
+      String newTaskId) {
 
     String[] spl = filename.split(oldTaskId);
 
@@ -1085,27 +1082,31 @@ public final class Utilities {
     }
 
     StringBuffer snew = new StringBuffer();
-    for (int idx = 0; idx < spl.length-1; idx++) {
+    for (int idx = 0; idx < spl.length - 1; idx++) {
       if (idx > 0) {
         snew.append(oldTaskId);
       }
       snew.append(spl[idx]);
     }
     snew.append(newTaskId);
-    snew.append(spl[spl.length-1]);
+    snew.append(spl[spl.length - 1]);
     return snew.toString();
   }
 
   /**
    * Get all file status from a root path and recursively go deep into certain levels.
-   * @param path the root path
-   * @param level the depth of directory should explore
-   * @param fs the file system
+   *
+   * @param path
+   *          the root path
+   * @param level
+   *          the depth of directory should explore
+   * @param fs
+   *          the file system
    * @return array of FileStatus
    * @throws IOException
    */
-  public static FileStatus[] getFileStatusRecurse(Path path, int level,
-      FileSystem fs) throws IOException {
+  public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs)
+      throws IOException {
 
     // construct a path pattern (e.g., /*/*) to find all dynamically generated paths
     StringBuilder sb = new StringBuilder(path.toUri().getPath());
@@ -1117,8 +1118,8 @@ public final class Utilities {
   }
 
   /**
-   * Remove all temporary files and duplicate (double-committed) files from a
-   * given directory.
+   * Remove all temporary files and duplicate (double-committed) files from a given directory.
+   *
    * @return a list of path names corresponding to should-be-created empty buckets.
    */
   public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws IOException {
@@ -1126,12 +1127,12 @@ public final class Utilities {
   }
 
   /**
-   * Remove all temporary files and duplicate (double-committed) files from a
-   * given directory.
+   * Remove all temporary files and duplicate (double-committed) files from a given directory.
+   *
    * @return a list of path names corresponding to should-be-created empty buckets.
    */
-  public static ArrayList<String> removeTempOrDuplicateFiles(FileSystem fs, Path path, DynamicPartitionCtx dpCtx)
-      throws IOException {
+  public static ArrayList<String> removeTempOrDuplicateFiles(FileSystem fs, Path path,
+      DynamicPartitionCtx dpCtx) throws IOException {
     if (path == null) {
       return null;
     }
@@ -1142,7 +1143,8 @@ public final class Utilities {
       HashMap<String, FileStatus> taskIDToFile = null;
 
       for (int i = 0; i < parts.length; ++i) {
-        assert parts[i].isDir(): "dynamic partition " + parts[i].getPath() + " is not a direcgtory";
+        assert parts[i].isDir() : "dynamic partition " + parts[i].getPath()
+            + " is not a direcgtory";
         FileStatus[] items = fs.listStatus(parts[i].getPath());
 
         // remove empty directory since DP insert should not generate empty partitions.
@@ -1162,7 +1164,7 @@ public final class Utilities {
           // get the missing buckets and generate empty buckets
           String taskID1 = taskIDToFile.keySet().iterator().next();
           Path bucketPath = taskIDToFile.values().iterator().next().getPath();
-          for (int j = 0; j < dpCtx.getNumBuckets(); ++j ) {
+          for (int j = 0; j < dpCtx.getNumBuckets(); ++j) {
             String taskID2 = replaceTaskId(taskID1, j);
             if (!taskIDToFile.containsKey(taskID2)) {
               // create empty bucket, file name should be derived from taskID2
@@ -1177,11 +1179,10 @@ public final class Utilities {
       removeTempOrDuplicateFiles(items, fs);
     }
     return result;
- }
+  }
 
-  public static HashMap<String, FileStatus> removeTempOrDuplicateFiles(
-      FileStatus[] items, FileSystem fs)
-      throws IOException {
+  public static HashMap<String, FileStatus> removeTempOrDuplicateFiles(FileStatus[] items,
+      FileSystem fs) throws IOException {
 
     if (items == null || fs == null) {
       return null;
@@ -1214,12 +1215,12 @@ public final class Utilities {
           long len1 = toDelete.getLen();
           long len2 = taskIdToFile.get(taskId).getLen();
           if (!fs.delete(toDelete.getPath(), true)) {
-            throw new IOException("Unable to delete duplicate file: "
-                + toDelete.getPath() + ". Existing file: " + taskIdToFile.get(taskId).getPath());
+            throw new IOException("Unable to delete duplicate file: " + toDelete.getPath()
+                + ". Existing file: " + taskIdToFile.get(taskId).getPath());
           } else {
             LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length "
-                + len1 + ". Existing file: " +  taskIdToFile.get(taskId).getPath()
-                + " with length " + len2);
+                + len1 + ". Existing file: " + taskIdToFile.get(taskId).getPath() + " with length "
+                + len2);
           }
         }
       }
@@ -1237,8 +1238,7 @@ public final class Utilities {
    * @param newPaths
    *          Array of classpath elements
    */
-  public static ClassLoader addToClassPath(ClassLoader cloader,
-      String[] newPaths) throws Exception {
+  public static ClassLoader addToClassPath(ClassLoader cloader, String[] newPaths) throws Exception {
     URLClassLoader loader = (URLClassLoader) cloader;
     List<URL> curPath = Arrays.asList(loader.getURLs());
     ArrayList<URL> newPath = new ArrayList<URL>();
@@ -1270,8 +1270,7 @@ public final class Utilities {
    * @param pathsToRemove
    *          Array of classpath elements
    */
-  public static void removeFromClassPath(String[] pathsToRemove)
-      throws Exception {
+  public static void removeFromClassPath(String[] pathsToRemove) throws Exception {
     Thread curThread = Thread.currentThread();
     URLClassLoader loader = (URLClassLoader) curThread.getContextClassLoader();
     Set<URL> newPath = new HashSet<URL>(Arrays.asList(loader.getURLs()));
@@ -1307,8 +1306,7 @@ public final class Utilities {
     return names;
   }
 
-  public static List<String> getColumnNamesFromFieldSchema(
-      List<FieldSchema> partCols) {
+  public static List<String> getColumnNamesFromFieldSchema(List<FieldSchema> partCols) {
     List<String> names = new ArrayList<String>();
     for (FieldSchema o : partCols) {
       names.add(o.getName());
@@ -1344,8 +1342,8 @@ public final class Utilities {
     return names;
   }
 
-  public static void validateColumnNames(List<String> colNames,
-      List<String> checkCols) throws SemanticException {
+  public static void validateColumnNames(List<String> colNames, List<String> checkCols)
+      throws SemanticException {
     Iterator<String> checkColsIter = checkCols.iterator();
     while (checkColsIter.hasNext()) {
       String toCheck = checkColsIter.next();
@@ -1365,16 +1363,15 @@ public final class Utilities {
   }
 
   /**
-   * Gets the default notification interval to send progress updates to the
-   * tracker. Useful for operators that may not output data for a while.
+   * Gets the default notification interval to send progress updates to the tracker. Useful for
+   * operators that may not output data for a while.
    *
    * @param hconf
    * @return the interval in milliseconds
    */
   public static int getDefaultNotificationInterval(Configuration hconf) {
     int notificationInterval;
-    Integer expInterval = Integer.decode(hconf
-        .get("mapred.tasktracker.expiry.interval"));
+    Integer expInterval = Integer.decode(hconf.get("mapred.tasktracker.expiry.interval"));
 
     if (expInterval != null) {
       notificationInterval = expInterval.intValue() / 2;
@@ -1386,12 +1383,14 @@ public final class Utilities {
   }
 
   /**
-   * Copies the storage handler properties configured for a table descriptor
-   * to a runtime job configuration.
+   * Copies the storage handler properties configured for a table descriptor to a runtime job
+   * configuration.
    *
-   * @param tbl table descriptor from which to read
+   * @param tbl
+   *          table descriptor from which to read
    *
-   * @param job configuration which receives configured properties
+   * @param job
+   *          configuration which receives configured properties
    */
   public static void copyTableJobPropertiesToConf(TableDesc tbl, JobConf job) {
     Map<String, String> jobProperties = tbl.getJobProperties();
@@ -1406,14 +1405,17 @@ public final class Utilities {
   /**
    * Calculate the total size of input files.
    *
-   * @param job  the hadoop job conf.
-   * @param work map reduce job plan
-   * @param filter filter to apply to the input paths before calculating size
+   * @param job
+   *          the hadoop job conf.
+   * @param work
+   *          map reduce job plan
+   * @param filter
+   *          filter to apply to the input paths before calculating size
    * @return the summary of all the input paths.
    * @throws IOException
    */
-  public static ContentSummary getInputSummary
-    (Context ctx, MapredWork work, PathFilter filter) throws IOException {
+  public static ContentSummary getInputSummary(Context ctx, MapredWork work, PathFilter filter)
+      throws IOException {
 
     long[] summary = {0, 0, 0};
 
@@ -1422,7 +1424,7 @@ public final class Utilities {
       try {
         Path p = new Path(path);
 
-        if(filter != null && !filter.accept(p)) {
+        if (filter != null && !filter.accept(p)) {
           continue;
         }
 
@@ -1459,19 +1461,18 @@ public final class Utilities {
     return true;
   }
 
-  public static List<ExecDriver> getMRTasks (List<Task<? extends Serializable>> tasks) {
-    List<ExecDriver> mrTasks = new ArrayList<ExecDriver> ();
-    if(tasks !=  null) {
+  public static List<ExecDriver> getMRTasks(List<Task<? extends Serializable>> tasks) {
+    List<ExecDriver> mrTasks = new ArrayList<ExecDriver>();
+    if (tasks != null) {
       getMRTasks(tasks, mrTasks);
     }
     return mrTasks;
   }
 
-  private static void getMRTasks (List<Task<? extends Serializable>> tasks,
-                                  List<ExecDriver> mrTasks) {
+  private static void getMRTasks(List<Task<? extends Serializable>> tasks, List<ExecDriver> mrTasks) {
     for (Task<? extends Serializable> task : tasks) {
-      if (task instanceof ExecDriver && !mrTasks.contains((ExecDriver)task)) {
-        mrTasks.add((ExecDriver)task);
+      if (task instanceof ExecDriver && !mrTasks.contains((ExecDriver) task)) {
+        mrTasks.add((ExecDriver) task);
       }
 
       if (task.getDependentTasks() != null) {
@@ -1485,45 +1486,43 @@ public final class Utilities {
   }
 
   /**
-   * Construct a list of full partition spec from Dynamic Partition Context and
-   * the directory names corresponding to these dynamic partitions.
+   * Construct a list of full partition spec from Dynamic Partition Context and the directory names
+   * corresponding to these dynamic partitions.
    */
   public static List<LinkedHashMap<String, String>> getFullDPSpecs(Configuration conf,
-      DynamicPartitionCtx dpCtx)
-      throws HiveException {
+      DynamicPartitionCtx dpCtx) throws HiveException {
 
     try {
       Path loadPath = new Path(dpCtx.getRootPath());
       FileSystem fs = loadPath.getFileSystem(conf);
-    	int numDPCols = dpCtx.getNumDPCols();
-    	FileStatus[] status = Utilities.getFileStatusRecurse(loadPath, numDPCols, fs);
+      int numDPCols = dpCtx.getNumDPCols();
+      FileStatus[] status = Utilities.getFileStatusRecurse(loadPath, numDPCols, fs);
 
-    	if (status.length == 0) {
-    	  LOG.warn("No partition is genereated by dynamic partitioning");
-    	  return null;
-    	}
-
-    	// partial partition specification
-    	Map<String, String> partSpec = dpCtx.getPartSpec();
-
-    	// list of full partition specification
-    	List<LinkedHashMap<String, String>> fullPartSpecs =
-    	  new ArrayList<LinkedHashMap<String, String>>();
-
-    	// for each dynamically created DP directory, construct a full partition spec
-    	// and load the partition based on that
-    	for (int i= 0; i < status.length; ++i) {
-    	  // get the dynamically created directory
-    	  Path partPath = status[i].getPath();
-    	  assert fs.getFileStatus(partPath).isDir():
-    	    "partitions " + partPath + " is not a directory !";
-
-    	  // generate a full partition specification
-    	  LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>(partSpec);
-      	Warehouse.makeSpecFromName(fullPartSpec, partPath);
-      	fullPartSpecs.add(fullPartSpec);
-    	}
-    	return fullPartSpecs;
+      if (status.length == 0) {
+        LOG.warn("No partition is genereated by dynamic partitioning");
+        return null;
+      }
+
+      // partial partition specification
+      Map<String, String> partSpec = dpCtx.getPartSpec();
+
+      // list of full partition specification
+      List<LinkedHashMap<String, String>> fullPartSpecs = new ArrayList<LinkedHashMap<String, String>>();
+
+      // for each dynamically created DP directory, construct a full partition spec
+      // and load the partition based on that
+      for (int i = 0; i < status.length; ++i) {
+        // get the dynamically created directory
+        Path partPath = status[i].getPath();
+        assert fs.getFileStatus(partPath).isDir() : "partitions " + partPath
+            + " is not a directory !";
+
+        // generate a full partition specification
+        LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>(partSpec);
+        Warehouse.makeSpecFromName(fullPartSpec, partPath);
+        fullPartSpecs.add(fullPartSpec);
+      }
+      return fullPartSpecs;
     } catch (IOException e) {
       throw new HiveException(e);
     }
@@ -1551,9 +1550,7 @@ public final class Utilities {
       columnNames.append(colInfo.getInternalName());
     }
     String columnNamesString = columnNames.toString();
-    jobConf.set(
-      Constants.LIST_COLUMNS,
-      columnNamesString);
+    jobConf.set(Constants.LIST_COLUMNS, columnNamesString);
   }
 
   public static void validatePartSpec(Table tbl, Map<String, String> partSpec)
@@ -1561,13 +1558,37 @@ public final class Utilities {
 
     List<FieldSchema> parts = tbl.getPartitionKeys();
     Set<String> partCols = new HashSet<String>(parts.size());
-    for (FieldSchema col: parts) {
+    for (FieldSchema col : parts) {
       partCols.add(col.getName());
     }
-    for (String col: partSpec.keySet()) {
+    for (String col : partSpec.keySet()) {
       if (!partCols.contains(col)) {
         throw new SemanticException(ErrorMsg.NONEXISTPARTCOL.getMsg(col));
       }
     }
   }
+
+  public static String suffix = ".hashtable";
+
+  public static String generatePath(String baseURI, Byte tag, String bigBucketFileName) {
+    String path = new String(baseURI + Path.SEPARATOR + "-" + tag + "-" + bigBucketFileName
+        + suffix);
+    return path;
+  }
+
+  public static String generateFileName(Byte tag, String bigBucketFileName) {
+    String fileName = new String("-" + tag + "-" + bigBucketFileName + suffix);
+    return fileName;
+  }
+
+  public static String generateTmpURI(String baseURI, String id) {
+    String tmpFileURI = new String(baseURI + Path.SEPARATOR + "HashTable-" + id);
+    return tmpFileURI;
+  }
+
+  public static String now() {
+    Calendar cal = Calendar.getInstance();
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
+    return sdf.format(cal.getTime());
+  }
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinKey.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinKey.java?rev=1034276&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinKey.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinKey.java Fri Nov 12 06:12:44 2010
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Map Join Object used for both key.
+ */
+public abstract class AbstractMapJoinKey implements Externalizable {
+
+  protected static int metadataTag = -1;
+
+  public AbstractMapJoinKey() {
+  }
+
+  @Override
+  public abstract boolean equals(Object o);
+
+  @Override
+  public abstract int hashCode();
+
+  public abstract void readExternal(ObjectInput in) throws IOException, ClassNotFoundException;
+
+  public abstract void writeExternal(ObjectOutput out) throws IOException;
+
+  public abstract boolean hasAnyNulls();
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java?rev=1034276&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java Fri Nov 12 06:12:44 2010
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+public abstract class AbstractRowContainer<Row> {
+
+  public AbstractRowContainer() {
+
+  }
+
+  public abstract void add(Row t) throws HiveException;
+
+  public abstract Row first() throws HiveException;
+
+  public abstract Row next() throws HiveException;
+
+  /**
+   * Get the number of elements in the RowContainer.
+   *
+   * @return number of elements in the RowContainer
+   */
+
+  public abstract int size();
+
+  /**
+   * Remove all elements in the RowContainer.
+   */
+
+  public abstract void clear() throws HiveException;
+}