You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/11/17 18:33:11 UTC

svn commit: r1036128 [2/19] - in /hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/o...

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Wed Nov 17 17:33:06 2010
@@ -42,17 +42,20 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.util.ReflectionUtils;
 
-public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc>
-    implements Serializable {
+
+public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc> implements
+    Serializable {
   private static final long serialVersionUID = 1L;
-  private static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class
-      .getName());
+  private static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class.getName());
 
   // from abstract map join operator
   /**
@@ -68,10 +71,8 @@ public class HashTableSinkOperator exten
    */
   protected transient Map<Byte, List<ObjectInspector>> joinKeysStandardObjectInspectors;
 
-  protected transient int posBigTableTag = -1; // one of the tables that is not
-                                               // in memory
-  protected transient int posBigTableAlias = -1; // one of the tables that is
-                                                 // not in memory
+  protected transient int posBigTableTag = -1; // one of the tables that is not in memory
+  protected transient int posBigTableAlias = -1; // one of the tables that is not in memory
   transient int mapJoinRowsKey; // rows for a given key
 
   protected transient RowContainer<ArrayList<Object>> emptyList = null;
@@ -114,6 +115,9 @@ public class HashTableSinkOperator exten
 
   private long rowNumber = 0;
   protected transient LogHelper console;
+  private long hashTableScale;
+  private boolean isAbort = false;
+
 
   public static class HashTableSinkObjectCtx {
     ObjectInspector standardOI;
@@ -125,8 +129,8 @@ public class HashTableSinkOperator exten
      * @param standardOI
      * @param serde
      */
-    public HashTableSinkObjectCtx(ObjectInspector standardOI, SerDe serde,
-        TableDesc tblDesc, Configuration conf) {
+    public HashTableSinkObjectCtx(ObjectInspector standardOI, SerDe serde, TableDesc tblDesc,
+        Configuration conf) {
       this.standardOI = standardOI;
       this.serde = serde;
       this.tblDesc = tblDesc;
@@ -157,25 +161,28 @@ public class HashTableSinkOperator exten
 
   }
 
+  private static final transient String[] FATAL_ERR_MSG = {
+      null, // counter value 0 means no error
+      "Mapside join size exceeds hive.mapjoin.maxsize. "
+          + "Please increase that or remove the mapjoin hint."};
   private final int metadataKeyTag = -1;
   transient int[] metadataValueTag;
   transient int maxMapJoinSize;
 
+
   public HashTableSinkOperator() {
-    // super();
-    console = new LogHelper(LOG, true);
   }
 
   public HashTableSinkOperator(MapJoinOperator mjop) {
     this.conf = new HashTableSinkDesc(mjop.getConf());
-    console = new LogHelper(LOG);
   }
 
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
-
-    maxMapJoinSize = HiveConf.getIntVar(hconf,
-        HiveConf.ConfVars.HIVEMAXMAPJOINSIZE);
+    boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT);
+    console = new LogHelper(LOG, isSilent);
+    maxMapJoinSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAXMAPJOINSIZE);
 
     numMapRowsRead = 0;
     firstRow = true;
@@ -187,8 +194,7 @@ public class HashTableSinkOperator exten
 
     posBigTableAlias = order[posBigTableTag];
 
-    // initialize some variables, which used to be initialized in
-    // CommonJoinOperator
+    // initialize some variables, which used to be initialized in CommonJoinOperator
     numAliases = conf.getExprs().size();
     this.hconf = hconf;
     totalSz = 0;
@@ -197,28 +203,25 @@ public class HashTableSinkOperator exten
 
     // process join keys
     joinKeys = new HashMap<Byte, List<ExprNodeEvaluator>>();
-    JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), order,
-        posBigTableAlias);
-    joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(
-        joinKeys, inputObjInspectors, posBigTableAlias);
+    JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), order, posBigTableAlias);
+    joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys,
+        inputObjInspectors, posBigTableAlias);
     joinKeysStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
         joinKeysObjectInspectors, posBigTableAlias);
 
     // process join values
     joinValues = new HashMap<Byte, List<ExprNodeEvaluator>>();
-    JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), order,
-        posBigTableAlias);
-    joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(
-        joinValues, inputObjInspectors, posBigTableAlias);
+    JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), order, posBigTableAlias);
+    joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues,
+        inputObjInspectors, posBigTableAlias);
     joinValuesStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
         joinValuesObjectInspectors, posBigTableAlias);
 
     // process join filters
     joinFilters = new HashMap<Byte, List<ExprNodeEvaluator>>();
-    JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(), order,
-        posBigTableAlias);
-    joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(
-        joinFilters, inputObjInspectors, posBigTableAlias);
+    JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(), order, posBigTableAlias);
+    joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinFilters,
+        inputObjInspectors, posBigTableAlias);
 
     if (noOuterJoin) {
       rowContainerStandardObjectInspectors = joinValuesStandardObjectInspectors;
@@ -231,8 +234,7 @@ public class HashTableSinkOperator exten
         ArrayList<ObjectInspector> rcOIs = new ArrayList<ObjectInspector>();
         rcOIs.addAll(joinValuesObjectInspectors.get(alias));
         // for each alias, add object inspector for boolean as the last element
-        rcOIs
-            .add(PrimitiveObjectInspectorFactory.writableBooleanObjectInspector);
+        rcOIs.add(PrimitiveObjectInspectorFactory.writableBooleanObjectInspector);
         rowContainerObjectInspectors.put(alias, rcOIs);
       }
       rowContainerStandardObjectInspectors = getStandardObjectInspectors(rowContainerObjectInspectors);
@@ -245,52 +247,62 @@ public class HashTableSinkOperator exten
 
     mapJoinTables = new HashMap<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>>();
 
+    int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD);
+    float hashTableLoadFactor = HiveConf.getFloatVar(hconf,
+        HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
+    float hashTableMaxMemoryUsage = HiveConf.getFloatVar(hconf,
+        HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE);
+    hashTableScale = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVEHASHTABLESCALE);
+    if (hashTableScale <= 0) {
+      hashTableScale = 1;
+    }
+
     // initialize the hash tables for other tables
     for (Byte pos : order) {
       if (pos == posBigTableTag) {
         continue;
       }
 
-      HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
+      HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>(
+          hashTableThreshold, hashTableLoadFactor, hashTableMaxMemoryUsage);
+
       mapJoinTables.put(pos, hashTable);
     }
   }
 
+
+
   protected static HashMap<Byte, List<ObjectInspector>> getStandardObjectInspectors(
       Map<Byte, List<ObjectInspector>> aliasToObjectInspectors) {
     HashMap<Byte, List<ObjectInspector>> result = new HashMap<Byte, List<ObjectInspector>>();
-    for (Entry<Byte, List<ObjectInspector>> oiEntry : aliasToObjectInspectors
-        .entrySet()) {
+    for (Entry<Byte, List<ObjectInspector>> oiEntry : aliasToObjectInspectors.entrySet()) {
       Byte alias = oiEntry.getKey();
       List<ObjectInspector> oiList = oiEntry.getValue();
-      ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(
-          oiList.size());
+      ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(oiList.size());
       for (int i = 0; i < oiList.size(); i++) {
-        fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList
-            .get(i), ObjectInspectorCopyOption.WRITABLE));
+        fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList.get(i),
+            ObjectInspectorCopyOption.WRITABLE));
       }
       result.put(alias, fieldOIList);
     }
     return result;
+
   }
 
-  public void generateMapMetaData() throws Exception {
+  private void setKeyMetaData() throws SerDeException {
     TableDesc keyTableDesc = conf.getKeyTblDesc();
-    SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
-        .getDeserializerClass(), null);
+    SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
+        null);
     keySerializer.initialize(null, keyTableDesc.getProperties());
 
     MapJoinMetaData.clear();
-    MapJoinMetaData.put(Integer.valueOf(metadataKeyTag),
-        new HashTableSinkObjectCtx(ObjectInspectorUtils
-            .getStandardObjectInspector(keySerializer.getObjectInspector(),
-                ObjectInspectorCopyOption.WRITABLE), keySerializer,
-            keyTableDesc, hconf));
+    MapJoinMetaData.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
+        ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
+            ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, hconf));
   }
 
   /*
-   * This operator only process small tables Read the key/value pairs Load them
-   * into hashtable
+   * This operator only process small tables Read the key/value pairs Load them into hashtable
    */
   @Override
   public void processOp(Object row, int tag) throws HiveException {
@@ -298,20 +310,20 @@ public class HashTableSinkOperator exten
     try {
       if (firstRow) {
         // generate the map metadata
-        generateMapMetaData();
+        setKeyMetaData();
         firstRow = false;
       }
       alias = order[tag];
       // alias = (byte)tag;
 
       // compute keys and values as StandardObjects
-      AbstractMapJoinKey keyMap = JoinUtil.computeMapJoinKeys(row, joinKeys
-          .get(alias), joinKeysObjectInspectors.get(alias));
+      AbstractMapJoinKey keyMap = JoinUtil.computeMapJoinKeys(row, joinKeys.get(alias),
+          joinKeysObjectInspectors.get(alias));
+
+      Object[] value = JoinUtil.computeMapJoinValues(row, joinValues.get(alias),
+          joinValuesObjectInspectors.get(alias), joinFilters.get(alias), joinFilterObjectInspectors
+              .get(alias), noOuterJoin);
 
-      Object[] value = JoinUtil.computeMapJoinValues(row,
-          joinValues.get(alias), joinValuesObjectInspectors.get(alias),
-          joinFilters.get(alias), joinFilterObjectInspectors.get(alias),
-          noOuterJoin);
 
       HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = mapJoinTables
           .get((byte) tag);
@@ -326,24 +338,20 @@ public class HashTableSinkOperator exten
 
         if (metadataValueTag[tag] == -1) {
           metadataValueTag[tag] = order[tag];
-
-          TableDesc valueTableDesc = conf.getValueTblDescs().get(tag);
-          SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc
-              .getDeserializerClass(), null);
-          valueSerDe.initialize(null, valueTableDesc.getProperties());
-
-          MapJoinMetaData.put(Integer.valueOf(metadataValueTag[tag]),
-              new HashTableSinkObjectCtx(ObjectInspectorUtils
-                  .getStandardObjectInspector(valueSerDe.getObjectInspector(),
-                      ObjectInspectorCopyOption.WRITABLE), valueSerDe,
-                  valueTableDesc, hconf));
+          setValueMetaData(tag);
         }
 
         // Construct externalizable objects for key and value
         if (needNewKey) {
-          MapJoinObjectValue valueObj = new MapJoinObjectValue(
-              metadataValueTag[tag], res);
+          MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
+
           rowNumber++;
+          if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
+            isAbort = hashTable.isAbort(rowNumber, console);
+            if (isAbort) {
+              throw new HiveException("RunOutOfMeomoryUsage");
+            }
+          }
           hashTable.put(keyMap, valueObj);
         }
 
@@ -352,13 +360,34 @@ public class HashTableSinkOperator exten
         res.add(value);
       }
 
-    } catch (Exception e) {
-      e.printStackTrace();
+
+    } catch (SerDeException e) {
       throw new HiveException(e);
     }
 
   }
 
+  private void setValueMetaData(int tag) throws SerDeException {
+    TableDesc valueTableDesc = conf.getValueTblFilteredDescs().get(tag);
+    SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(),
+        null);
+
+    valueSerDe.initialize(null, valueTableDesc.getProperties());
+
+    List<ObjectInspector> newFields = rowContainerStandardObjectInspectors.get((Byte) alias);
+    int length = newFields.size();
+    List<String> newNames = new ArrayList<String>(length);
+    for (int i = 0; i < length; i++) {
+      String tmp = new String("tmp_" + i);
+      newNames.add(tmp);
+    }
+    StandardStructObjectInspector standardOI = ObjectInspectorFactory
+        .getStandardStructObjectInspector(newNames, newFields);
+
+    MapJoinMetaData.put(Integer.valueOf(metadataValueTag[tag]), new HashTableSinkObjectCtx(
+        standardOI, valueSerDe, valueTableDesc, hconf));
+  }
+
   @Override
   public void closeOp(boolean abort) throws HiveException {
     try {
@@ -371,31 +400,24 @@ public class HashTableSinkOperator exten
             .entrySet()) {
           // get the key and value
           Byte tag = hashTables.getKey();
-          HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = hashTables
-              .getValue();
+          HashMapWrapper hashTable = hashTables.getValue();
 
           // get current input file name
-          String bigBucketFileName = this.getExecContext()
-              .getCurrentBigBucketFile();
+          String bigBucketFileName = this.getExecContext().getCurrentBigBucketFile();
           if (bigBucketFileName == null || bigBucketFileName.length() == 0) {
             bigBucketFileName = "-";
           }
           // get the tmp URI path; it will be a hdfs path if not local mode
-          String tmpURIPath = Utilities.generatePath(tmpURI, tag,
-              bigBucketFileName);
-          console.printInfo(Utilities.now()
-              + "\tDump the hashtable into file: " + tmpURIPath);
+          String tmpURIPath = PathUtil.generatePath(tmpURI, tag, bigBucketFileName);
+          hashTable.isAbort(rowNumber, console);
+          console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath);
           // get the hashtable file and path
           Path path = new Path(tmpURIPath);
           FileSystem fs = path.getFileSystem(hconf);
           File file = new File(path.toUri().getPath());
           fs.create(path);
-
           fileLength = hashTable.flushMemoryCacheToPersistent(file);
-
-          console.printInfo(Utilities.now() + "\t Processing rows: "
-              + rowNumber + "\t key number:" + hashTable.size());
-          console.printInfo("Upload 1 File to: " + tmpURIPath + " File size: "
+          console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: "
               + fileLength);
 
           hashTable.close();
@@ -411,7 +433,7 @@ public class HashTableSinkOperator exten
 
   /**
    * Implements the getName function for the Node Interface.
-   * 
+   *
    * @return the name of the operator
    */
   @Override
@@ -424,4 +446,6 @@ public class HashTableSinkOperator exten
     return OperatorType.HASHTABLESINK;
   }
 
+
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Wed Nov 17 17:33:06 2010
@@ -409,4 +409,7 @@ public class MapRedTask extends ExecDriv
 
     return null;
   }
+
+
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java Wed Nov 17 17:33:06 2010
@@ -57,7 +57,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
 
-public class MapredLocalTask  extends Task<MapredLocalWork> implements Serializable {
+public class MapredLocalTask extends Task<MapredLocalWork> implements Serializable {
 
   private Map<String, FetchOperator> fetchOperators;
   private JobConf job;
@@ -67,11 +67,11 @@ public class MapredLocalTask  extends Ta
   static final String[] HIVE_SYS_PROP = {"build.dir", "build.dir.hive"};
   public static MemoryMXBean memoryMXBean;
 
-    // not sure we need this exec context; but all the operators in the work
+  // not sure we need this exec context; but all the operators in the work
   // will pass this context throught
   private final ExecMapperContext execContext = new ExecMapperContext();
 
-  public MapredLocalTask(){
+  public MapredLocalTask() {
     super();
   }
 
@@ -83,25 +83,23 @@ public class MapredLocalTask  extends Ta
   }
 
   @Override
-  public void initialize(HiveConf conf, QueryPlan queryPlan,
-      DriverContext driverContext) {
+  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
     super.initialize(conf, queryPlan, driverContext);
     job = new JobConf(conf, ExecDriver.class);
   }
 
-  public static String now(){
+  public static String now() {
     Calendar cal = Calendar.getInstance();
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss");
-    return  sdf.format(cal.getTime());
+    return sdf.format(cal.getTime());
   }
 
 
 
   @Override
-  public int execute(DriverContext driverContext){
-    try{
-      //generate the cmd line to run in the child jvm
-      //String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
+  public int execute(DriverContext driverContext) {
+    try {
+      // generate the cmd line to run in the child jvm
       Context ctx = driverContext.getCtx();
       String hiveJar = conf.getJar();
 
@@ -115,16 +113,15 @@ public class MapredLocalTask  extends Ta
       LOG.info("Generating plan file " + planPath.toString());
       Utilities.serializeMapRedLocalWork(plan, out);
 
-      String isSilent = "true".equalsIgnoreCase(System
-          .getProperty("test.silent")) ? "-nolog" : "";
+      String isSilent = "true".equalsIgnoreCase(System.getProperty("test.silent")) ? "-nolog" : "";
 
       String jarCmd;
 
-      jarCmd = hiveJar + " " + ExecDriver.class.getName() ;
+      jarCmd = hiveJar + " " + ExecDriver.class.getName();
 
       String hiveConfArgs = ExecDriver.generateCmdLine(conf);
-      String cmdLine = hadoopExec + " jar " + jarCmd + " -localtask -plan "
-          + planPath.toString() + " " + isSilent + " " + hiveConfArgs;
+      String cmdLine = hadoopExec + " jar " + jarCmd + " -localtask -plan " + planPath.toString()
+          + " " + isSilent + " " + hiveConfArgs;
 
       String workDir = (new File(".")).getCanonicalPath();
       String files = ExecDriver.getResourceFiles(conf, SessionState.ResourceType.FILE);
@@ -134,16 +131,16 @@ public class MapredLocalTask  extends Ta
 
         workDir = (new Path(ctx.getLocalTmpFileURI())).toUri().getPath();
 
-        if (! (new File(workDir)).mkdir()) {
-          throw new IOException ("Cannot create tmp working dir: " + workDir);
+        if (!(new File(workDir)).mkdir()) {
+          throw new IOException("Cannot create tmp working dir: " + workDir);
         }
 
-        for (String f: StringUtils.split(files, ',')) {
+        for (String f : StringUtils.split(files, ',')) {
           Path p = new Path(f);
           String target = p.toUri().getPath();
           String link = workDir + Path.SEPARATOR + p.getName();
           if (FileUtil.symLink(target, link) != 0) {
-            throw new IOException ("Cannot link to added file: " + target + " from: " + link);
+            throw new IOException("Cannot link to added file: " + target + " from: " + link);
           }
         }
       }
@@ -166,31 +163,30 @@ public class MapredLocalTask  extends Ta
       Map<String, String> variables = new HashMap(System.getenv());
       // The user can specify the hadoop memory
 
-      //if ("local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT))) {
-        // if we are running in local mode - then the amount of memory used
-        // by the child jvm can no longer default to the memory used by the
-        // parent jvm
-        //int hadoopMem = conf.getIntVar(HiveConf.ConfVars.HIVEHADOOPMAXMEM);
-      int hadoopMem= conf.getIntVar(HiveConf.ConfVars.HIVEHADOOPMAXMEM);;
+      // if ("local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT))) {
+      // if we are running in local mode - then the amount of memory used
+      // by the child jvm can no longer default to the memory used by the
+      // parent jvm
+      // int hadoopMem = conf.getIntVar(HiveConf.ConfVars.HIVEHADOOPMAXMEM);
+      int hadoopMem = conf.getIntVar(HiveConf.ConfVars.HIVEHADOOPMAXMEM);
       if (hadoopMem == 0) {
         // remove env var that would default child jvm to use parent's memory
         // as default. child jvm would use default memory for a hadoop client
         variables.remove(HADOOP_MEM_KEY);
       } else {
         // user specified the memory for local mode hadoop run
-        console.printInfo(" set heap size\t"+hadoopMem+"MB");
+        console.printInfo(" set heap size\t" + hadoopMem + "MB");
         variables.put(HADOOP_MEM_KEY, String.valueOf(hadoopMem));
       }
-      //} else {
-        // nothing to do - we are not running in local mode - only submitting
-        // the job via a child process. in this case it's appropriate that the
-        // child jvm use the same memory as the parent jvm
+      // } else {
+      // nothing to do - we are not running in local mode - only submitting
+      // the job via a child process. in this case it's appropriate that the
+      // child jvm use the same memory as the parent jvm
 
-      //}
+      // }
 
       if (variables.containsKey(HADOOP_OPTS_KEY)) {
-        variables.put(HADOOP_OPTS_KEY, variables.get(HADOOP_OPTS_KEY)
-            + hadoopOpts);
+        variables.put(HADOOP_OPTS_KEY, variables.get(HADOOP_OPTS_KEY) + hadoopOpts);
       } else {
         variables.put(HADOOP_OPTS_KEY, hadoopOpts);
       }
@@ -205,10 +201,8 @@ public class MapredLocalTask  extends Ta
       // Run ExecDriver in another JVM
       executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir));
 
-      StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(),
-          null, System.out);
-      StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(),
-          null, System.err);
+      StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
+      StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err);
 
       outPrinter.start();
       errPrinter.start();
@@ -217,10 +211,9 @@ public class MapredLocalTask  extends Ta
 
       if (exitVal != 0) {
         LOG.error("Execution failed with exit status: " + exitVal);
-        console.printError("Mapred Local Task Failed. Give up the map join stragery");
       } else {
         LOG.info("Execution completed successfully");
-        console.printInfo("Mapred Local Task Running Successfully . Keep using map join stragery");
+        console.printInfo("Mapred Local Task Succeeded . Convert the Join into MapJoin");
       }
 
       return exitVal;
@@ -233,54 +226,56 @@ public class MapredLocalTask  extends Ta
 
 
 
-  public int executeFromChildJVM(DriverContext driverContext){
-
+  public int executeFromChildJVM(DriverContext driverContext) {
     // check the local work
-    if(work == null){
+    if (work == null) {
       return -1;
     }
     memoryMXBean = ManagementFactory.getMemoryMXBean();
-    console.printInfo(Utilities.now()+"\tStarting to luaunch local task to process map join ");
-    console.printInfo("\tmaximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
+    long startTime = System.currentTimeMillis();
+    console.printInfo(Utilities.now()
+        + "\tStarting to luaunch local task to process map join;\tmaximum memory = "
+        + memoryMXBean.getHeapMemoryUsage().getMax());
     fetchOperators = new HashMap<String, FetchOperator>();
     Map<FetchOperator, JobConf> fetchOpJobConfMap = new HashMap<FetchOperator, JobConf>();
     execContext.setJc(job);
-    //set the local work, so all the operator can get this context
+    // set the local work, so all the operator can get this context
     execContext.setLocalWork(work);
     boolean inputFileChangeSenstive = work.getInputFileChangeSensitive();
-    try{
+    try {
 
       initializeOperators(fetchOpJobConfMap);
-      //for each big table's bucket, call the start forward
-      if(inputFileChangeSenstive){
-        for( LinkedHashMap<String, ArrayList<String>> bigTableBucketFiles:
-          work.getBucketMapjoinContext().getAliasBucketFileNameMapping().values()){
-          for(String bigTableBucket: bigTableBucketFiles.keySet()){
-            startForward(inputFileChangeSenstive,bigTableBucket);
+      // for each big table's bucket, call the start forward
+      if (inputFileChangeSenstive) {
+        for (LinkedHashMap<String, ArrayList<String>> bigTableBucketFiles : work
+            .getBucketMapjoinContext().getAliasBucketFileNameMapping().values()) {
+          for (String bigTableBucket : bigTableBucketFiles.keySet()) {
+            startForward(inputFileChangeSenstive, bigTableBucket);
           }
         }
-      }else{
-        startForward(inputFileChangeSenstive,null);
+      } else {
+        startForward(inputFileChangeSenstive, null);
       }
-      console.printInfo(now()+"\tEnd of local task ");
+      long currentTime = System.currentTimeMillis();
+      long elapsed = currentTime - startTime;
+      console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: "
+          + Utilities.showTime(elapsed) + " sec.");
     } catch (Throwable e) {
-      if (e instanceof OutOfMemoryError) {
+      if (e instanceof OutOfMemoryError
+          || (e instanceof HiveException && e.getMessage().equals("RunOutOfMeomoryUsage"))) {
         // Don't create a new object if we are already out of memory
-        l4j.error("Out of Memory Error");
-        console.printError("[Warning] Small table is too large to put into memory");
-        return 2;
+        return 3;
       } else {
         l4j.error("Hive Runtime Error: Map local work failed");
         e.printStackTrace();
+        return 2;
       }
-    }finally{
-      console.printInfo(Utilities.now()+"\tFinish running local task");
     }
     return 0;
   }
 
   private void startForward(boolean inputFileChangeSenstive, String bigTableBucket)
-    throws Exception{
+      throws Exception {
     for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
       int fetchOpRows = 0;
       String alias = entry.getKey();
@@ -288,17 +283,17 @@ public class MapredLocalTask  extends Ta
 
       if (inputFileChangeSenstive) {
         fetchOp.clearFetchContext();
-        setUpFetchOpContext(fetchOp, alias,bigTableBucket);
+        setUpFetchOpContext(fetchOp, alias, bigTableBucket);
       }
 
-      //get the root operator
+      // get the root operator
       Operator<? extends Serializable> forwardOp = work.getAliasToWork().get(alias);
-      //walk through the operator tree
+      // walk through the operator tree
       while (true) {
         InspectableObject row = fetchOp.getNextRow();
         if (row == null) {
           if (inputFileChangeSenstive) {
-            String fileName=this.getFileName(bigTableBucket);
+            String fileName = this.getFileName(bigTableBucket);
             execContext.setCurrentBigBucketFile(fileName);
             forwardOp.reset();
           }
@@ -310,22 +305,23 @@ public class MapredLocalTask  extends Ta
         // check if any operator had a fatal error or early exit during
         // execution
         if (forwardOp.getDone()) {
-          //ExecMapper.setDone(true);
+          // ExecMapper.setDone(true);
           break;
         }
       }
     }
   }
+
   private void initializeOperators(Map<FetchOperator, JobConf> fetchOpJobConfMap)
-    throws HiveException{
+      throws HiveException {
     // this mapper operator is used to initialize all the operators
     for (Map.Entry<String, FetchWork> entry : work.getAliasToFetchWork().entrySet()) {
       JobConf jobClone = new JobConf(job);
 
       Operator<? extends Serializable> tableScan = work.getAliasToWork().get(entry.getKey());
       boolean setColumnsNeeded = false;
-      if(tableScan instanceof TableScanOperator) {
-        ArrayList<Integer> list = ((TableScanOperator)tableScan).getNeededColumnIDs();
+      if (tableScan instanceof TableScanOperator) {
+        ArrayList<Integer> list = ((TableScanOperator) tableScan).getNeededColumnIDs();
         if (list != null) {
           ColumnProjectionUtils.appendReadColumnIDs(jobClone, list);
           setColumnsNeeded = true;
@@ -336,18 +332,18 @@ public class MapredLocalTask  extends Ta
         ColumnProjectionUtils.setFullyReadColumns(jobClone);
       }
 
-      //create a fetch operator
-      FetchOperator fetchOp = new FetchOperator(entry.getValue(),jobClone);
+      // create a fetch operator
+      FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone);
       fetchOpJobConfMap.put(fetchOp, jobClone);
       fetchOperators.put(entry.getKey(), fetchOp);
       l4j.info("fetchoperator for " + entry.getKey() + " created");
     }
-    //initilize all forward operator
+    // initilize all forward operator
     for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
-      //get the forward op
+      // get the forward op
       Operator<? extends Serializable> forwardOp = work.getAliasToWork().get(entry.getKey());
 
-      //put the exe context into all the operators
+      // put the exe context into all the operators
       forwardOp.setExecContext(execContext);
       // All the operators need to be initialized before process
       FetchOperator fetchOp = entry.getValue();
@@ -356,54 +352,58 @@ public class MapredLocalTask  extends Ta
       if (jobConf == null) {
         jobConf = job;
       }
-      //initialize the forward operator
+      // initialize the forward operator
       forwardOp.initialize(jobConf, new ObjectInspector[] {fetchOp.getOutputObjectInspector()});
       l4j.info("fetchoperator for " + entry.getKey() + " initialized");
     }
   }
 
 
-  private void setUpFetchOpContext(FetchOperator fetchOp, String alias,String currentInputFile)
-  throws Exception {
+  private void setUpFetchOpContext(FetchOperator fetchOp, String alias, String currentInputFile)
+      throws Exception {
 
-    BucketMapJoinContext bucketMatcherCxt = this.work
-        .getBucketMapjoinContext();
+    BucketMapJoinContext bucketMatcherCxt = this.work.getBucketMapjoinContext();
 
-    Class<? extends BucketMatcher> bucketMatcherCls = bucketMatcherCxt
-        .getBucketMatcherClass();
-    BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance(
-        bucketMatcherCls, null);
-    bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt
-        .getAliasBucketFileNameMapping());
+    Class<? extends BucketMatcher> bucketMatcherCls = bucketMatcherCxt.getBucketMatcherClass();
+    BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance(bucketMatcherCls,
+        null);
+    bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt.getAliasBucketFileNameMapping());
 
-    List<Path> aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile,
-        bucketMatcherCxt.getMapJoinBigTableAlias(), alias);
+    List<Path> aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, bucketMatcherCxt
+        .getMapJoinBigTableAlias(), alias);
     Iterator<Path> iter = aliasFiles.iterator();
     fetchOp.setupContext(iter, null);
   }
 
-  private String getFileName(String path){
-    if(path== null || path.length()==0) {
+  private String getFileName(String path) {
+    if (path == null || path.length() == 0) {
       return null;
     }
 
-    int last_separator = path.lastIndexOf(Path.SEPARATOR)+1;
+    int last_separator = path.lastIndexOf(Path.SEPARATOR) + 1;
     String fileName = path.substring(last_separator);
     return fileName;
 
   }
+
   @Override
-  public void localizeMRTmpFilesImpl(Context ctx){
+  public void localizeMRTmpFilesImpl(Context ctx) {
 
   }
 
   @Override
+  public boolean isMapRedLocalTask() {
+    return true;
+  }
+
+  @Override
   public String getName() {
     return "MAPREDLOCAL";
   }
+
   @Override
   public int getType() {
-    //assert false;
+    // assert false;
     return StageType.MAPREDLOCAL;
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Wed Nov 17 17:33:06 2010
@@ -194,12 +194,54 @@ public final class OperatorFactory {
    * Returns an operator given the conf and a list of parent operators.
    */
   public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
+      List<Operator<? extends Serializable>> oplist) {
+    Operator<T> ret = get((Class<T>) conf.getClass());
+    ret.setConf(conf);
+    if (oplist.size() == 0) {
+      return (ret);
+    }
+
+    // Add the new operator as child of each of the passed in operators
+    for (Operator op : oplist) {
+      List<Operator> children = op.getChildOperators();
+      if (children == null) {
+        children = new ArrayList<Operator>();
+      }
+      children.add(ret);
+      op.setChildOperators(children);
+    }
+
+    // add parents for the newly created operator
+    List<Operator<? extends Serializable>> parent = new ArrayList<Operator<? extends Serializable>>();
+    for (Operator op : oplist) {
+      parent.add(op);
+    }
+
+    ret.setParentOperators(parent);
+
+    return (ret);
+  }
+
+  /**
+   * Returns an operator given the conf and a list of parent operators.
+   */
+  public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
       RowSchema rwsch, Operator... oplist) {
     Operator<T> ret = getAndMakeChild(conf, oplist);
     ret.setSchema(rwsch);
     return (ret);
   }
 
+  /**
+   * Returns an operator given the conf and a list of parent operators.
+   */
+  public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
+      RowSchema rwsch, List<Operator<? extends Serializable>> oplist) {
+    Operator<T> ret = getAndMakeChild(conf, oplist);
+    ret.setSchema(rwsch);
+    return (ret);
+  }
+
   private OperatorFactory() {
     // prevent instantiation
   }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java?rev=1036128&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java Wed Nov 17 17:33:06 2010
@@ -0,0 +1,20 @@
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.fs.Path;
+
+public class PathUtil {
+  public static String suffix=".hashtable";
+  public static String generatePath(String baseURI,Byte tag,String bigBucketFileName){
+    String path = new String(baseURI+Path.SEPARATOR+"-"+tag+"-"+bigBucketFileName+suffix);
+    return path;
+  }
+  public static String generateFileName(Byte tag,String bigBucketFileName){
+    String fileName = new String("-"+tag+"-"+bigBucketFileName+suffix);
+    return fileName;
+  }
+
+  public static String generateTmpURI(String baseURI,String id){
+    String tmpFileURI = new String(baseURI+Path.SEPARATOR+"HashTable-"+id);
+    return tmpFileURI;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Wed Nov 17 17:33:06 2010
@@ -42,8 +42,7 @@ import org.apache.hadoop.util.StringUtil
  * Task implementation.
  **/
 
-public abstract class Task<T extends Serializable> implements Serializable,
-    Node {
+public abstract class Task<T extends Serializable> implements Serializable, Node {
 
   private static final long serialVersionUID = 1L;
   protected transient boolean started;
@@ -59,6 +58,9 @@ public abstract class Task<T extends Ser
   protected transient HashMap<String, Long> taskCounters;
   protected transient DriverContext driverContext;
   protected transient boolean clonedConf = false;
+  protected Task<? extends Serializable> backupTask;
+  protected List<Task<? extends Serializable>> backupChildrenTasks = new ArrayList<Task<? extends Serializable>>();
+
 
   // Descendants tasks who subscribe feeds from this task
   protected transient List<Task<? extends Serializable>> feedSubscribers;
@@ -81,8 +83,7 @@ public abstract class Task<T extends Ser
     this.taskCounters = new HashMap<String, Long>();
   }
 
-  public void initialize(HiveConf conf, QueryPlan queryPlan,
-      DriverContext driverContext) {
+  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
     this.queryPlan = queryPlan;
     isdone = false;
     started = false;
@@ -103,8 +104,8 @@ public abstract class Task<T extends Ser
   }
 
   /**
-   * This method is called in the Driver on every task. It updates counters and
-   * calls execute(), which is overridden in each task
+   * This method is called in the Driver on every task. It updates counters and calls execute(),
+   * which is overridden in each task
    *
    * @return return value of execute()
    */
@@ -127,8 +128,7 @@ public abstract class Task<T extends Ser
   }
 
   /**
-   * This method is overridden in each Task. TODO execute should return a
-   * TaskHandle.
+   * This method is overridden in each Task. TODO execute should return a TaskHandle.
    *
    * @return status of executing the task
    */
@@ -160,10 +160,61 @@ public abstract class Task<T extends Ser
     return parentTasks;
   }
 
+  public Task<? extends Serializable> getBackupTask() {
+    return backupTask;
+  }
+
+
+  public void setBackupTask(Task<? extends Serializable> backupTask) {
+    this.backupTask = backupTask;
+  }
+
+  public List<Task<? extends Serializable>> getBackupChildrenTasks() {
+    return backupChildrenTasks;
+  }
+
+  public void setBackupChildrenTasks(List<Task<? extends Serializable>> backupChildrenTasks) {
+    this.backupChildrenTasks = backupChildrenTasks;
+  }
+
+  public Task<? extends Serializable> getAndInitBackupTask() {
+    if (backupTask != null) {
+      // first set back the backup task with its children task.
+      for (Task<? extends Serializable> backupChild : backupChildrenTasks) {
+        backupChild.getParentTasks().add(backupTask);
+      }
+
+      // recursively remove task from its children tasks if this task doesn't have any parent task
+      this.removeFromChildrenTasks();
+    }
+    return backupTask;
+  }
+
+  public void removeFromChildrenTasks() {
+
+    List<Task<? extends Serializable>> childrenTasks = this.getChildTasks();
+    if (childrenTasks == null) {
+      return;
+    }
+
+    for (Task<? extends Serializable> childTsk : childrenTasks) {
+      // remove this task from its children tasks
+      childTsk.getParentTasks().remove(this);
+
+      // recursively remove non-parent task from its children
+      List<Task<? extends Serializable>> siblingTasks = childTsk.getParentTasks();
+      if (siblingTasks == null || siblingTasks.size() == 0) {
+        childTsk.removeFromChildrenTasks();
+      }
+    }
+
+    return;
+  }
+
+
   /**
-   * The default dependent tasks are just child tasks, but different types
-   * could implement their own (e.g. ConditionalTask will use the listTasks
-   * as dependents).
+   * The default dependent tasks are just child tasks, but different types could implement their own
+   * (e.g. ConditionalTask will use the listTasks as dependents).
    *
    * @return a list of tasks that are dependent on this task.
    */
@@ -172,8 +223,8 @@ public abstract class Task<T extends Ser
   }
 
   /**
-   * Add a dependent task on the current task. Return if the dependency already
-   * existed or is this a new one
+   * Add a dependent task on the current task. Return if the dependency already existed or is this a
+   * new one
    *
    * @return true if the task got added false if it already existed
    */
@@ -204,8 +255,7 @@ public abstract class Task<T extends Ser
   public void removeDependentTask(Task<? extends Serializable> dependent) {
     if ((getChildTasks() != null) && (getChildTasks().contains(dependent))) {
       getChildTasks().remove(dependent);
-      if ((dependent.getParentTasks() != null)
-          && (dependent.getParentTasks().contains(this))) {
+      if ((dependent.getParentTasks() != null) && (dependent.getParentTasks().contains(this))) {
         dependent.getParentTasks().remove(this);
       }
     }
@@ -279,6 +329,10 @@ public abstract class Task<T extends Ser
     return false;
   }
 
+  public boolean isMapRedLocalTask() {
+    return false;
+  }
+
   public boolean hasReduce() {
     return false;
   }
@@ -288,8 +342,7 @@ public abstract class Task<T extends Ser
   }
 
   /**
-   * Should be overridden to return the type of the specific task among the
-   * types in TaskType.
+   * Should be overridden to return the type of the specific task among the types in TaskType.
    *
    * @return TaskTypeType.* or -1 if not overridden
    */
@@ -299,21 +352,23 @@ public abstract class Task<T extends Ser
   }
 
   /**
-   * If this task uses any map-reduce intermediate data (either for reading
-   * or for writing), localize them (using the supplied Context). Map-Reduce
-   * intermediate directories are allocated using Context.getMRTmpFileURI()
-   * and can be localized using localizeMRTmpFileURI().
+   * If this task uses any map-reduce intermediate data (either for reading or for writing),
+   * localize them (using the supplied Context). Map-Reduce intermediate directories are allocated
+   * using Context.getMRTmpFileURI() and can be localized using localizeMRTmpFileURI().
    *
-   * This method is declared abstract to force any task code to explicitly
-   * deal with this aspect of execution.
+   * This method is declared abstract to force any task code to explicitly deal with this aspect of
+   * execution.
    *
-   * @param ctx context object with which to localize
+   * @param ctx
+   *          context object with which to localize
    */
   abstract protected void localizeMRTmpFilesImpl(Context ctx);
 
   /**
    * Localize a task tree
-   * @param ctx context object with which to localize
+   *
+   * @param ctx
+   *          context object with which to localize
    */
   public final void localizeMRTmpFiles(Context ctx) {
     localizeMRTmpFilesImpl(ctx);
@@ -322,7 +377,7 @@ public abstract class Task<T extends Ser
       return;
     }
 
-    for (Task<? extends Serializable> t: childTasks) {
+    for (Task<? extends Serializable> t : childTasks) {
       t.localizeMRTmpFiles(ctx);
     }
   }
@@ -330,12 +385,13 @@ public abstract class Task<T extends Ser
   /**
    * Subscribe the feed of publisher. To prevent cycles, a task can only subscribe to its ancestor.
    * Feed is a generic form of execution-time feedback (type, value) pair from one task to another
-   * task. Examples include dynamic partitions (which are only available at execution time).
-   * The MoveTask may pass the list of dynamic partitions to the StatsTask since after the
-   * MoveTask the list of dynamic partitions are lost (MoveTask moves them to the table's
-   * destination directory which is mixed with old partitions).
+   * task. Examples include dynamic partitions (which are only available at execution time). The
+   * MoveTask may pass the list of dynamic partitions to the StatsTask since after the MoveTask the
+   * list of dynamic partitions are lost (MoveTask moves them to the table's destination directory
+   * which is mixed with old partitions).
    *
-   * @param publisher this feed provider.
+   * @param publisher
+   *          this feed provider.
    */
   public void subscribeFeed(Task<? extends Serializable> publisher) {
     if (publisher != this && publisher.ancestorOrSelf(this)) {
@@ -353,7 +409,7 @@ public abstract class Task<T extends Ser
     }
     List<Task<? extends Serializable>> deps = getDependentTasks();
     if (deps != null) {
-      for (Task<? extends Serializable> d: deps) {
+      for (Task<? extends Serializable> d : deps) {
         if (d.ancestorOrSelf(desc)) {
           return true;
         }
@@ -373,7 +429,7 @@ public abstract class Task<T extends Ser
   // push the feed to its subscribers
   protected void pushFeed(FeedType feedType, Object feedValue) {
     if (feedSubscribers != null) {
-      for (Task<? extends Serializable> s: feedSubscribers) {
+      for (Task<? extends Serializable> s : feedSubscribers) {
         s.receiveFeed(feedType, feedValue);
       }
     }
@@ -383,10 +439,10 @@ public abstract class Task<T extends Ser
   protected void receiveFeed(FeedType feedType, Object feedValue) {
   }
 
-  protected void cloneConf () {
+  protected void cloneConf() {
     if (!clonedConf) {
       clonedConf = true;
       conf = new HiveConf(conf);
     }
   }
-}
\ No newline at end of file
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Wed Nov 17 17:33:06 2010
@@ -73,7 +73,8 @@ public final class TaskFactory {
     taskvec.add(new taskTuple<MapredLocalWork>(MapredLocalWork.class,
         MapredLocalTask.class));
     taskvec.add(new taskTuple<StatsWork>(StatsWork.class,
-        StatsTask.class));        
+        StatsTask.class));
+
 
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Nov 17 17:33:06 2010
@@ -49,6 +49,7 @@ import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -1591,4 +1592,9 @@ public final class Utilities {
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
     return sdf.format(cal.getTime());
   }
+
+  public static String showTime(long time) {
+    SimpleDateFormat sdf = new SimpleDateFormat("ss");
+    return sdf.format(new Date(time));
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Wed Nov 17 17:33:06 2010
@@ -33,6 +33,7 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 
@@ -43,20 +44,20 @@ import org.apache.hadoop.hive.ql.session
  * the main memory hash table exceeds a certain threshold, new elements will go into the persistent
  * hash table.
  */
+
 public class HashMapWrapper<K, V> implements Serializable {
 
+  private static final long serialVersionUID = 1L;
   protected Log LOG = LogFactory.getLog(this.getClass().getName());
 
   // default threshold for using main memory based HashMap
+
   private static final int THRESHOLD = 1000000;
   private static final float LOADFACTOR = 0.75f;
+  private static final float MEMORYUSAGE = 1;
 
-  private double threshold; // threshold to put data into persistent hash table
-  // instead
+  private float maxMemoryUsage;
   private HashMap<K, V> mHash; // main memory HashMap
-
-
-
   protected transient LogHelper console;
 
   private File dumpFile;
@@ -71,10 +72,9 @@ public class HashMapWrapper<K, V> implem
    * @param threshold
    *          User specified threshold to store new values into persistent storage.
    */
-  public HashMapWrapper(int threshold, float loadFactor) {
-    this.threshold = 0.9;
+  public HashMapWrapper(int threshold, float loadFactor, float memoryUsage) {
+    maxMemoryUsage = memoryUsage;
     mHash = new HashMap<K, V>(threshold, loadFactor);
-    console = new LogHelper(LOG);
     memoryMXBean = ManagementFactory.getMemoryMXBean();
     maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
     LOG.info("maximum memory: " + maxMemory);
@@ -83,30 +83,28 @@ public class HashMapWrapper<K, V> implem
   }
 
   public HashMapWrapper(int threshold) {
-    this(THRESHOLD, 0.75f);
+    this(threshold, LOADFACTOR, MEMORYUSAGE);
   }
 
   public HashMapWrapper() {
-    this(THRESHOLD, LOADFACTOR);
+    this(THRESHOLD, LOADFACTOR, MEMORYUSAGE);
   }
 
-
   public V get(K key) {
     return mHash.get(key);
   }
 
-
   public boolean put(K key, V value) throws HiveException {
     // isAbort();
     mHash.put(key, value);
     return false;
   }
 
+
   public void remove(K key) {
     mHash.remove(key);
   }
 
-
   /**
    * Flush the main memory hash table into the persistent cache file
    *
@@ -146,7 +144,6 @@ public class HashMapWrapper<K, V> implem
    * @throws HiveException
    */
   public void close() throws HiveException {
-    // isAbort();
     mHash.clear();
   }
 
@@ -158,36 +155,25 @@ public class HashMapWrapper<K, V> implem
     return mHash.size();
   }
 
-  private boolean isAbort() {
-    int size = mHash.size();
-    // if(size >= 1000000 && size % 1000000 == 0 ){
+  public boolean isAbort(long numRows,LogHelper console) {
     System.gc();
     System.gc();
+    int size = mHash.size();
     long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
     double rate = (double) usedMemory / (double) maxMemory;
     long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
-    console.printInfo("Hashtable size:\t" + size + "\tMemory usage:\t" + usedMemory + "\t rate:\t"
-        + num.format(rate));
-    return true;
-
-  }
-
-  public Log getLOG() {
-    return LOG;
+    console.printInfo(Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t"
+        + size + "\tMemory usage:\t" + usedMemory + "\trate:\t" + num.format(rate));
+    if (rate > (double) maxMemoryUsage) {
+      return true;
+    }
+    return false;
   }
 
   public void setLOG(Log log) {
     LOG = log;
   }
 
-  public double getThreshold() {
-    return threshold;
-  }
-
-  public void setThreshold(double threshold) {
-    this.threshold = threshold;
-  }
-
   public HashMap<K, V> getMHash() {
     return mHash;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java Wed Nov 17 17:33:06 2010
@@ -30,7 +30,7 @@ public interface Dispatcher {
 
   /**
    * Dispatcher function.
-   * 
+   *
    * @param nd
    *          operator to process.
    * @param stack
@@ -43,4 +43,5 @@ public interface Dispatcher {
    */
   Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
       throws SemanticException;
+
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/TaskGraphWalker.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/TaskGraphWalker.java?rev=1036128&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/TaskGraphWalker.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/TaskGraphWalker.java Wed Nov 17 17:33:06 2010
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lib;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * base class for operator graph walker this class takes list of starting ops
+ * and walks them one by one. it maintains list of walked operators
+ * (dispatchedList) and a list of operators that are discovered but not yet
+ * dispatched
+ */
+public class TaskGraphWalker implements GraphWalker {
+
+
+  public class TaskGraphWalkerContext{
+    private final HashMap<Node, Object> reMap;
+
+    public TaskGraphWalkerContext(HashMap<Node, Object> reMap){
+      this.reMap = reMap;
+    }
+    public void addToDispatchList(Node dispatchedObj){
+      if(dispatchedObj != null) {
+        retMap.put(dispatchedObj, null);
+      }
+    }
+  }
+
+  protected Stack<Node> opStack;
+  private final List<Node> toWalk = new ArrayList<Node>();
+  private final HashMap<Node, Object> retMap = new HashMap<Node, Object>();
+  private final Dispatcher dispatcher;
+  private final  TaskGraphWalkerContext walkerCtx;
+
+  /**
+   * Constructor.
+   *
+   * @param disp
+   *          dispatcher to call for each op encountered
+   */
+  public TaskGraphWalker(Dispatcher disp) {
+    dispatcher = disp;
+    opStack = new Stack<Node>();
+    walkerCtx = new TaskGraphWalkerContext(retMap);
+  }
+
+  /**
+   * @return the toWalk
+   */
+  public List<Node> getToWalk() {
+    return toWalk;
+  }
+
+  /**
+   * @return the doneList
+   */
+  public Set<Node> getDispatchedList() {
+    return retMap.keySet();
+  }
+
+  /**
+   * Dispatch the current operator.
+   *
+   * @param nd
+   *          node being walked
+   * @param ndStack
+   *          stack of nodes encountered
+   * @throws SemanticException
+   */
+  public void dispatch(Node nd, Stack<Node> ndStack,TaskGraphWalkerContext walkerCtx) throws SemanticException {
+    Object[] nodeOutputs = null;
+    if (nd.getChildren() != null) {
+      nodeOutputs = new Object[nd.getChildren().size()+1];
+      nodeOutputs[0] = walkerCtx;
+      int i = 1;
+      for (Node child : nd.getChildren()) {
+        nodeOutputs[i++] = retMap.get(child);
+      }
+    }else{
+      nodeOutputs = new Object[1];
+      nodeOutputs[0] = walkerCtx;
+    }
+
+    Object retVal = dispatcher.dispatch(nd, ndStack, nodeOutputs);
+    retMap.put(nd, retVal);
+  }
+
+  public void dispatch(Node nd, Stack<Node> ndStack) throws SemanticException {
+    Object[] nodeOutputs = null;
+    if (nd.getChildren() != null) {
+      nodeOutputs = new Object[nd.getChildren().size()];
+      int i = 1;
+      for (Node child : nd.getChildren()) {
+        nodeOutputs[i++] = retMap.get(child);
+      }
+    }
+
+    Object retVal = dispatcher.dispatch(nd, ndStack, nodeOutputs);
+    retMap.put(nd, retVal);
+  }
+
+  /**
+   * starting point for walking.
+   *
+   * @throws SemanticException
+   */
+  public void startWalking(Collection<Node> startNodes,
+      HashMap<Node, Object> nodeOutput) throws SemanticException {
+    toWalk.addAll(startNodes);
+    while (toWalk.size() > 0) {
+      Node nd = toWalk.remove(0);
+      walk(nd);
+      if (nodeOutput != null) {
+        nodeOutput.put(nd, retMap.get(nd));
+      }
+    }
+  }
+
+  /**
+   * walk the current operator and its descendants.
+   *
+   * @param nd
+   *          current operator in the graph
+   * @throws SemanticException
+   */
+  public void walk(Node nd) throws SemanticException {
+      if(!(nd instanceof Task)){
+        throw new SemanticException("Task Graph Walker only walks for Task Graph");
+      }
+
+      if (getDispatchedList().contains(nd)) {
+        return;
+      }
+      if (opStack.empty() || nd != opStack.peek()) {
+        opStack.push(nd);
+      }
+
+      List<Task<? extends Serializable>> nextTaskList = null;
+      Set<Task<? extends Serializable>> nextTaskSet = new HashSet<Task<? extends Serializable>>();
+      List<Task<? extends Serializable>> taskListInConditionalTask = null;
+
+
+      if(nd instanceof ConditionalTask ){
+        //for conditional task, next task list should return the children tasks of each task, which
+        //is contained in the conditional task.
+        taskListInConditionalTask = ((ConditionalTask) nd).getListTasks();
+        for(Task<? extends Serializable> tsk: taskListInConditionalTask){
+          List<Task<? extends Serializable>> childTask = tsk.getChildTasks();
+          if(childTask != null){
+            nextTaskSet.addAll(tsk.getChildTasks());
+          }
+        }
+        //convert the set into list
+        if(nextTaskSet.size()>0){
+          nextTaskList = new ArrayList<Task<? extends Serializable>>();
+          for(Task<? extends Serializable> tsk:nextTaskSet ){
+            nextTaskList.add(tsk);
+          }
+        }
+      }else{
+        //for other tasks, just return its children tasks
+        nextTaskList = ((Task<? extends Serializable>)nd).getChildTasks();
+      }
+
+      if ((nextTaskList == null)
+          || getDispatchedList().containsAll(nextTaskList)) {
+        dispatch(nd, opStack,this.walkerCtx);
+        opStack.pop();
+        return;
+      }
+      // add children, self to the front of the queue in that order
+      getToWalk().add(0, nd);
+      getToWalk().removeAll(nextTaskList);
+      getToWalk().addAll(0, nextTaskList);
+
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Wed Nov 17 17:33:06 2010
@@ -179,7 +179,7 @@ public final class ColumnPrunerProcFacto
       cppCtx.getPrunedColLists().put((Operator<? extends Serializable>) nd,
           cols);
       ArrayList<Integer> needed_columns = new ArrayList<Integer>();
-      RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRR();
+      RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver();
       TableScanDesc desc = scanOp.getConf();
       List<VirtualColumn> virtualCols = desc.getVirtualCols();
       List<VirtualColumn> newVirtualCols = new ArrayList<VirtualColumn>();
@@ -232,7 +232,7 @@ public final class ColumnPrunerProcFacto
       ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
       HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap = cppCtx
           .getOpToParseCtxMap();
-      RowResolver redSinkRR = opToParseCtxMap.get(op).getRR();
+      RowResolver redSinkRR = opToParseCtxMap.get(op).getRowResolver();
       ReduceSinkDesc conf = op.getConf();
       List<Operator<? extends Serializable>> childOperators = op
           .getChildOperators();
@@ -250,7 +250,7 @@ public final class ColumnPrunerProcFacto
         assert parentOperators.size() == 1;
         Operator<? extends Serializable> par = parentOperators.get(0);
         JoinOperator childJoin = (JoinOperator) childOperators.get(0);
-        RowResolver parRR = opToParseCtxMap.get(par).getRR();
+        RowResolver parRR = opToParseCtxMap.get(par).getRowResolver();
         List<String> childJoinCols = cppCtx.getJoinPrunedColLists().get(
             childJoin).get((byte) conf.getTag());
         boolean[] flags = new boolean[conf.getValueCols().size()];
@@ -383,7 +383,7 @@ public final class ColumnPrunerProcFacto
         ArrayList<String> newOutputColumnNames = new ArrayList<String>();
         ArrayList<ColumnInfo> rs_oldsignature = op.getSchema().getSignature();
         ArrayList<ColumnInfo> rs_newsignature = new ArrayList<ColumnInfo>();
-        RowResolver old_rr = cppCtx.getOpToParseCtxMap().get(op).getRR();
+        RowResolver old_rr = cppCtx.getOpToParseCtxMap().get(op).getRowResolver();
         RowResolver new_rr = new RowResolver();
         for (String col : cols) {
           int index = originalOutputColumnNames.indexOf(col);
@@ -394,7 +394,7 @@ public final class ColumnPrunerProcFacto
           ColumnInfo columnInfo = old_rr.get(tabcol[0], tabcol[1]);
           new_rr.put(tabcol[0], tabcol[1], columnInfo);
         }
-        cppCtx.getOpToParseCtxMap().get(op).setRR(new_rr);
+        cppCtx.getOpToParseCtxMap().get(op).setRowResolver(new_rr);
         op.getSchema().setSignature(rs_newsignature);
         conf.setColList(newColList);
         conf.setOutputColumnNames(newOutputColumnNames);
@@ -465,7 +465,7 @@ public final class ColumnPrunerProcFacto
     Map<String, ExprNodeDesc> oldMap = reduce.getColumnExprMap();
     Map<String, ExprNodeDesc> newMap = new HashMap<String, ExprNodeDesc>();
     ArrayList<ColumnInfo> sig = new ArrayList<ColumnInfo>();
-    RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(reduce).getRR();
+    RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(reduce).getRowResolver();
     RowResolver newRR = new RowResolver();
     ArrayList<String> originalValueOutputColNames = reduceConf
         .getOutputValueColumnNames();
@@ -493,7 +493,7 @@ public final class ColumnPrunerProcFacto
     ArrayList<ExprNodeDesc> keyCols = reduceConf.getKeyCols();
     List<String> keys = new ArrayList<String>();
     RowResolver parResover = cppCtx.getOpToParseCtxMap().get(
-        reduce.getParentOperators().get(0)).getRR();
+        reduce.getParentOperators().get(0)).getRowResolver();
     for (int i = 0; i < keyCols.size(); i++) {
       keys = Utilities.mergeUniqElems(keys, keyCols.get(i).getCols());
     }
@@ -506,7 +506,7 @@ public final class ColumnPrunerProcFacto
       }
     }
 
-    cppCtx.getOpToParseCtxMap().get(reduce).setRR(newRR);
+    cppCtx.getOpToParseCtxMap().get(reduce).setRowResolver(newRR);
     reduce.setColumnExprMap(newMap);
     reduce.getSchema().setSignature(sig);
     reduceConf.setOutputValueColumnNames(newOutputColNames);
@@ -614,7 +614,7 @@ public final class ColumnPrunerProcFacto
      }
     }
 
-    RowResolver joinRR = cppCtx.getOpToParseCtxMap().get(op).getRR();
+    RowResolver joinRR = cppCtx.getOpToParseCtxMap().get(op).getRowResolver();
     RowResolver newJoinRR = new RowResolver();
     ArrayList<String> outputCols = new ArrayList<String>();
     ArrayList<ColumnInfo> rs = new ArrayList<ColumnInfo>();
@@ -699,7 +699,7 @@ public final class ColumnPrunerProcFacto
     op.setColumnExprMap(newColExprMap);
     conf.setOutputColumnNames(outputCols);
     op.getSchema().setSignature(rs);
-    cppCtx.getOpToParseCtxMap().get(op).setRR(newJoinRR);
+    cppCtx.getOpToParseCtxMap().get(op).setRowResolver(newJoinRR);
     cppCtx.getJoinPrunedColLists().put(op, prunedColLists);
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Wed Nov 17 17:33:06 2010
@@ -217,7 +217,7 @@ public class GenMRFileSink1 implements N
 
     // Add the extract operator to get the value fields
     RowResolver out_rwsch = new RowResolver();
-    RowResolver interim_rwsch = ctx.getParseCtx().getOpParseCtx().get(fsOp).getRR();
+    RowResolver interim_rwsch = ctx.getParseCtx().getOpParseCtx().get(fsOp).getRowResolver();
     Integer pos = Integer.valueOf(0);
     for (ColumnInfo colInfo : interim_rwsch.getColumnInfos()) {
       String[] info = interim_rwsch.reverseLookup(colInfo.getInternalName());

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Nov 17 17:33:06 2010
@@ -858,8 +858,9 @@ public final class GenMapRedUtils {
 
     // create a dummy tableScan operator on top of op
     // TableScanOperator is implicitly created here for each MapOperator
+    RowResolver rowResolver = opProcCtx.getParseCtx().getOpParseCtx().get(parent).getRowResolver();
     Operator<? extends Serializable> ts_op = putOpInsertMap(OperatorFactory
-        .get(TableScanDesc.class, parent.getSchema()), null, parseCtx);
+        .get(TableScanDesc.class, parent.getSchema()), rowResolver, parseCtx);
 
     childOpList = new ArrayList<Operator<? extends Serializable>>();
     childOpList.add(op);