You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/01/06 22:32:41 UTC

svn commit: r1556041 [3/42] - in /hive/trunk: cli/src/java/org/apache/hadoop/hive/cli/ common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql...

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Jan  6 21:32:38 2014
@@ -33,11 +33,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.DemuxOperator;
+import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MoveTask;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -52,6 +56,8 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
@@ -64,20 +70,31 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
+import org.apache.hadoop.hive.ql.plan.ConditionalWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
+import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.stats.StatsFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.InputFormat;
 
 /**
  * General utility common functions for the Processor to convert operator into
@@ -90,7 +107,7 @@ public final class GenMapRedUtils {
     LOG = LogFactory.getLog("org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils");
   }
 
-  private static boolean needsTagging(ReduceWork rWork) {
+  public static boolean needsTagging(ReduceWork rWork) {
     return rWork != null && (rWork.getReducer().getClass() == JoinOperator.class ||
          rWork.getReducer().getClass() == DemuxOperator.class);
   }
@@ -444,18 +461,38 @@ public final class GenMapRedUtils {
   public static void setTaskPlan(String alias_id,
       Operator<? extends OperatorDesc> topOp, Task<?> task, boolean local,
       GenMRProcContext opProcCtx, PrunedPartitionList pList) throws SemanticException {
-    MapWork plan = ((MapredWork) task.getWork()).getMapWork();
-    ParseContext parseCtx = opProcCtx.getParseCtx();
-    Set<ReadEntity> inputs = opProcCtx.getInputs();
+    setMapWork(((MapredWork) task.getWork()).getMapWork(), opProcCtx.getParseCtx(),
+        opProcCtx.getInputs(), pList, topOp, alias_id, opProcCtx.getConf(), local);
+    opProcCtx.addSeenOp(task, topOp);
+  }
 
+  /**
+   * initialize MapWork
+   *
+   * @param alias_id
+   *          current alias
+   * @param topOp
+   *          the top operator of the stack
+   * @param plan
+   *          map work to initialize
+   * @param local
+   *          whether you need to add to map-reduce or local work
+   * @param pList
+   *          pruned partition list. If it is null it will be computed on-the-fly.
+   * @param inputs
+   *          read entities for the map work
+   * @param conf
+   *          current instance of hive conf
+   */
+  public static void setMapWork(MapWork plan, ParseContext parseCtx, Set<ReadEntity> inputs,
+      PrunedPartitionList partsList, Operator<? extends OperatorDesc> topOp, String alias_id,
+      HiveConf conf, boolean local) throws SemanticException {
     ArrayList<Path> partDir = new ArrayList<Path>();
     ArrayList<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
 
     Path tblDir = null;
     TableDesc tblDesc = null;
 
-    PrunedPartitionList partsList = pList;
-
     plan.setNameToSplitSample(parseCtx.getNameToSplitSample());
 
     if (partsList == null) {
@@ -701,7 +738,6 @@ public final class GenMapRedUtils {
       }
       plan.setMapLocalWork(localPlan);
     }
-    opProcCtx.addSeenOp(task, topOp);
   }
 
   /**
@@ -751,6 +787,21 @@ public final class GenMapRedUtils {
   }
 
   /**
+   * Set key and value descriptor
+   * @param work RedueWork
+   * @param rs ReduceSinkOperator
+   */
+  public static void setKeyAndValueDesc(ReduceWork work, ReduceSinkOperator rs) {
+    work.setKeyDesc(rs.getConf().getKeySerializeInfo());
+    int tag = Math.max(0, rs.getConf().getTag());
+    List<TableDesc> tagToSchema = work.getTagToValueDesc();
+    while (tag + 1 > tagToSchema.size()) {
+      tagToSchema.add(null);
+    }
+    tagToSchema.set(tag, rs.getConf().getValueSerializeInfo());
+  }
+
+  /**
    * set key and value descriptor.
    *
    * @param plan
@@ -766,13 +817,7 @@ public final class GenMapRedUtils {
 
     if (topOp instanceof ReduceSinkOperator) {
       ReduceSinkOperator rs = (ReduceSinkOperator) topOp;
-      plan.setKeyDesc(rs.getConf().getKeySerializeInfo());
-      int tag = Math.max(0, rs.getConf().getTag());
-      List<TableDesc> tagToSchema = plan.getTagToValueDesc();
-      while (tag + 1 > tagToSchema.size()) {
-        tagToSchema.add(null);
-      }
-      tagToSchema.set(tag, rs.getConf().getValueSerializeInfo());
+      setKeyAndValueDesc(plan, rs);
     } else {
       List<Operator<? extends OperatorDesc>> children = topOp.getChildOperators();
       if (children != null) {
@@ -1096,6 +1141,571 @@ public final class GenMapRedUtils {
     }
   }
 
+  /**
+   * @param fsInput The FileSink operator.
+   * @param ctx The MR processing context.
+   * @param finalName the final destination path the merge job should output.
+   * @param dependencyTask
+   * @param mvTasks
+   * @param conf
+   * @param currTask
+   * @throws SemanticException
+
+   * create a Map-only merge job using CombineHiveInputFormat for all partitions with
+   * following operators:
+   *          MR job J0:
+   *          ...
+   *          |
+   *          v
+   *          FileSinkOperator_1 (fsInput)
+   *          |
+   *          v
+   *          Merge job J1:
+   *          |
+   *          v
+   *          TableScan (using CombineHiveInputFormat) (tsMerge)
+   *          |
+   *          v
+   *          FileSinkOperator (fsMerge)
+   *
+   *          Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths
+   *          do
+   *          not contain the dynamic partitions (their parent). So after the dynamic partitions are
+   *          created (after the first job finished before the moveTask or ConditionalTask start),
+   *          we need to change the pathToPartitionInfo & pathToAlias to include the dynamic
+   *          partition
+   *          directories.
+   *
+   */
+  public static void createMRWorkForMergingFiles (FileSinkOperator fsInput,
+   Path finalName, DependencyCollectionTask dependencyTask,
+   List<Task<MoveWork>> mvTasks, HiveConf conf,
+   Task<? extends Serializable> currTask) throws SemanticException {
+
+    //
+    // 1. create the operator tree
+    //
+    FileSinkDesc fsInputDesc = fsInput.getConf();
+
+    // Create a TableScan operator
+    RowSchema inputRS = fsInput.getSchema();
+    Operator<? extends OperatorDesc> tsMerge =
+        GenMapRedUtils.createTemporaryTableScanOperator(inputRS);
+
+    // Create a FileSink operator
+    TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone();
+    FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName.toUri().toString(), ts,
+      conf.getBoolVar(ConfVars.COMPRESSRESULT));
+    FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild(
+      fsOutputDesc, inputRS, tsMerge);
+
+    // If the input FileSinkOperator is a dynamic partition enabled, the tsMerge input schema
+    // needs to include the partition column, and the fsOutput should have
+    // a DynamicPartitionCtx to indicate that it needs to dynamically partitioned.
+    DynamicPartitionCtx dpCtx = fsInputDesc.getDynPartCtx();
+    if (dpCtx != null && dpCtx.getNumDPCols() > 0) {
+      // adding DP ColumnInfo to the RowSchema signature
+      ArrayList<ColumnInfo> signature = inputRS.getSignature();
+      String tblAlias = fsInputDesc.getTableInfo().getTableName();
+      LinkedHashMap<String, String> colMap = new LinkedHashMap<String, String>();
+      StringBuilder partCols = new StringBuilder();
+      for (String dpCol : dpCtx.getDPColNames()) {
+        ColumnInfo colInfo = new ColumnInfo(dpCol,
+            TypeInfoFactory.stringTypeInfo, // all partition column type should be string
+            tblAlias, true); // partition column is virtual column
+        signature.add(colInfo);
+        colMap.put(dpCol, dpCol); // input and output have the same column name
+        partCols.append(dpCol).append('/');
+      }
+      partCols.setLength(partCols.length() - 1); // remove the last '/'
+      inputRS.setSignature(signature);
+
+      // create another DynamicPartitionCtx, which has a different input-to-DP column mapping
+      DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx);
+      dpCtx2.setInputToDPCols(colMap);
+      fsOutputDesc.setDynPartCtx(dpCtx2);
+
+      // update the FileSinkOperator to include partition columns
+      fsInputDesc.getTableInfo().getProperties().setProperty(
+        org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS,
+        partCols.toString()); // list of dynamic partition column names
+    } else {
+      // non-partitioned table
+      fsInputDesc.getTableInfo().getProperties().remove(
+        org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
+    }
+
+    //
+    // 2. Constructing a conditional task consisting of a move task and a map reduce task
+    //
+    MoveWork dummyMv = new MoveWork(null, null, null,
+         new LoadFileDesc(new Path(fsInputDesc.getFinalDirName()), finalName, true, null, null), false);
+    MapWork cplan;
+    Serializable work;
+
+    if (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) &&
+        fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+
+      // Check if InputFormatClass is valid
+      String inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
+      try {
+        Class c = (Class<? extends InputFormat>) Class.forName(inputFormatClass);
+
+        LOG.info("RCFile format- Using block level merge");
+        cplan = GenMapRedUtils.createRCFileMergeTask(fsInputDesc, finalName,
+            dpCtx != null && dpCtx.getNumDPCols() > 0);
+        work = cplan;
+      } catch (ClassNotFoundException e) {
+        String msg = "Illegal input format class: " + inputFormatClass;
+        throw new SemanticException(msg);
+      }
+
+    } else {
+      cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc);
+      work = new MapredWork();
+      ((MapredWork)work).setMapWork(cplan);
+    }
+    // use CombineHiveInputFormat for map-only merging
+    cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
+    // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
+    // know if merge MR2 will be triggered at execution time
+    ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work,
+        fsInputDesc.getFinalDirName());
+
+    // keep the dynamic partition context in conditional task resolver context
+    ConditionalResolverMergeFilesCtx mrCtx =
+        (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
+    mrCtx.setDPCtx(fsInputDesc.getDynPartCtx());
+    mrCtx.setLbCtx(fsInputDesc.getLbCtx());
+
+    //
+    // 3. add the moveTask as the children of the conditional task
+    //
+    linkMoveTask(fsOutput, cndTsk, mvTasks, conf, dependencyTask);
+  }
+
+  /**
+   * Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all
+   * possible subtrees branching from the ConditionalTask.
+   *
+   * @param newOutput
+   * @param cndTsk
+   * @param mvTasks
+   * @param hconf
+   * @param dependencyTask
+   */
+  public static void linkMoveTask(FileSinkOperator newOutput,
+      ConditionalTask cndTsk, List<Task<MoveWork>> mvTasks, HiveConf hconf,
+      DependencyCollectionTask dependencyTask) {
+
+    Task<MoveWork> mvTask = GenMapRedUtils.findMoveTask(mvTasks, newOutput);
+
+    for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
+      linkMoveTask(mvTask, tsk, hconf, dependencyTask);
+    }
+  }
+
+  /**
+   * Follows the task tree down from task and makes all leaves parents of mvTask
+   *
+   * @param mvTask
+   * @param task
+   * @param hconf
+   * @param dependencyTask
+   */
+  public static void linkMoveTask(Task<MoveWork> mvTask,
+      Task<? extends Serializable> task, HiveConf hconf,
+      DependencyCollectionTask dependencyTask) {
+
+    if (task.getDependentTasks() == null || task.getDependentTasks().isEmpty()) {
+      // If it's a leaf, add the move task as a child
+      addDependentMoveTasks(mvTask, hconf, task, dependencyTask);
+    } else {
+      // Otherwise, for each child run this method recursively
+      for (Task<? extends Serializable> childTask : task.getDependentTasks()) {
+        linkMoveTask(mvTask, childTask, hconf, dependencyTask);
+      }
+    }
+  }
+
+  /**
+   * Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask. If mvTask is a
+   * load table, and HIVE_MULTI_INSERT_ATOMIC_OUTPUTS is set, adds mvTask as a dependent of
+   * dependencyTaskForMultiInsert in ctx, otherwise adds mvTask as a dependent of parentTask as
+   * well.
+   *
+   * @param mvTask
+   * @param hconf
+   * @param parentTask
+   * @param dependencyTask
+   */
+  public static void addDependentMoveTasks(Task<MoveWork> mvTask, HiveConf hconf,
+      Task<? extends Serializable> parentTask, DependencyCollectionTask dependencyTask) {
+
+    if (mvTask != null) {
+      if (dependencyTask != null) {
+        parentTask.addDependentTask(dependencyTask);
+        if (mvTask.getWork().getLoadTableWork() != null) {
+          // Moving tables/partitions depend on the dependencyTask
+          dependencyTask.addDependentTask(mvTask);
+        } else {
+          // Moving files depends on the parentTask (we still want the dependencyTask to depend
+          // on the parentTask)
+          parentTask.addDependentTask(mvTask);
+        }
+      } else {
+        parentTask.addDependentTask(mvTask);
+      }
+    }
+  }
+
+
+  /**
+   * Add the StatsTask as a dependent task of the MoveTask
+   * because StatsTask will change the Table/Partition metadata. For atomicity, we
+   * should not change it before the data is actually there done by MoveTask.
+   *
+   * @param nd
+   *          the FileSinkOperator whose results are taken care of by the MoveTask.
+   * @param mvTask
+   *          The MoveTask that moves the FileSinkOperator's results.
+   * @param currTask
+   *          The MapRedTask that the FileSinkOperator belongs to.
+   * @param hconf
+   *          HiveConf
+   */
+  public static void addStatsTask(FileSinkOperator nd, MoveTask mvTask,
+      Task<? extends Serializable> currTask, HiveConf hconf) {
+
+    MoveWork mvWork = mvTask.getWork();
+    StatsWork statsWork = null;
+    if (mvWork.getLoadTableWork() != null) {
+      statsWork = new StatsWork(mvWork.getLoadTableWork());
+    } else if (mvWork.getLoadFileWork() != null) {
+      statsWork = new StatsWork(mvWork.getLoadFileWork());
+    }
+    assert statsWork != null : "Error when genereting StatsTask";
+
+    statsWork.setSourceTask(currTask);
+    statsWork.setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
+
+    if (currTask.getWork() instanceof MapredWork) {
+      MapredWork mrWork = (MapredWork) currTask.getWork();
+      mrWork.getMapWork().setGatheringStats(true);
+      if (mrWork.getReduceWork() != null) {
+        mrWork.getReduceWork().setGatheringStats(true);
+      }
+    }
+
+    // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix
+    // in FileSinkDesc is used for stats publishing. They should be consistent.
+    statsWork.setAggKey(nd.getConf().getStatsAggPrefix());
+    Task<? extends Serializable> statsTask = TaskFactory.get(statsWork, hconf);
+
+    // mark the MapredWork and FileSinkOperator for gathering stats
+    nd.getConf().setGatherStats(true);
+    nd.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
+    nd.getConf().setMaxStatsKeyPrefixLength(StatsFactory.getMaxPrefixLength(hconf));
+    // mrWork.addDestinationTable(nd.getConf().getTableInfo().getTableName());
+
+    // subscribe feeds from the MoveTask so that MoveTask can forward the list
+    // of dynamic partition list to the StatsTask
+    mvTask.addDependentTask(statsTask);
+    statsTask.subscribeFeed(mvTask);
+  }
+
+  /**
+   * Returns true iff current query is an insert into for the given file sink
+   *
+   * @param parseCtx
+   * @param fsOp
+   * @return
+   */
+  public static boolean isInsertInto(ParseContext parseCtx, FileSinkOperator fsOp) {
+    return fsOp.getConf().getTableInfo().getTableName() != null &&
+        parseCtx.getQB().getParseInfo().isInsertToTable();
+  }
+
+  /**
+   * Create a MapredWork based on input path, the top operator and the input
+   * table descriptor.
+   *
+   * @param conf
+   * @param topOp
+   *          the table scan operator that is the root of the MapReduce task.
+   * @param fsDesc
+   *          the file sink descriptor that serves as the input to this merge task.
+   * @param parentMR
+   *          the parent MapReduce work
+   * @param parentFS
+   *          the last FileSinkOperator in the parent MapReduce work
+   * @return the MapredWork
+   */
+  private static MapWork createMRWorkForMergingFiles (HiveConf conf,
+    Operator<? extends OperatorDesc> topOp,  FileSinkDesc fsDesc) {
+
+    ArrayList<String> aliases = new ArrayList<String>();
+    String inputDir = fsDesc.getFinalDirName();
+    TableDesc tblDesc = fsDesc.getTableInfo();
+    aliases.add(inputDir); // dummy alias: just use the input path
+
+    // constructing the default MapredWork
+    MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf);
+    MapWork cplan = cMrPlan.getMapWork();
+    cplan.getPathToAliases().put(inputDir, aliases);
+    cplan.getPathToPartitionInfo().put(inputDir, new PartitionDesc(tblDesc, null));
+    cplan.getAliasToWork().put(inputDir, topOp);
+    cplan.setMapperCannotSpanPartns(true);
+
+    return cplan;
+  }
+
+  /**
+   * Create a block level merge task for RCFiles.
+   *
+   * @param fsInputDesc
+   * @param finalName
+   * @return MergeWork if table is stored as RCFile,
+   *         null otherwise
+   */
+  public static MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
+      Path finalName, boolean hasDynamicPartitions) throws SemanticException {
+
+    String inputDir = fsInputDesc.getFinalDirName();
+    TableDesc tblDesc = fsInputDesc.getTableInfo();
+
+    if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+      ArrayList<Path> inputDirs = new ArrayList<Path>(1);
+      ArrayList<String> inputDirstr = new ArrayList<String>(1);
+      if (!hasDynamicPartitions
+          && !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
+        inputDirs.add(new Path(inputDir));
+        inputDirstr.add(inputDir);
+      }
+
+      MergeWork work = new MergeWork(inputDirs, finalName,
+          hasDynamicPartitions, fsInputDesc.getDynPartCtx());
+      LinkedHashMap<String, ArrayList<String>> pathToAliases =
+          new LinkedHashMap<String, ArrayList<String>>();
+      pathToAliases.put(inputDir, (ArrayList<String>) inputDirstr.clone());
+      work.setMapperCannotSpanPartns(true);
+      work.setPathToAliases(pathToAliases);
+      work.setAliasToWork(
+          new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
+      if (hasDynamicPartitions
+          || GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
+        work.getPathToPartitionInfo().put(inputDir,
+            new PartitionDesc(tblDesc, null));
+      }
+      work.setListBucketingCtx(fsInputDesc.getLbCtx());
+
+      return work;
+    }
+
+    throw new SemanticException("createRCFileMergeTask called on non-RCFile table");
+  }
+
+  /**
+   * Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
+   *
+   * @param conf
+   *          HiveConf
+   * @param currTask
+   *          current leaf task
+   * @param mvWork
+   *          MoveWork for the move task
+   * @param mergeWork
+   *          MapredWork for the merge task.
+   * @param inputPath
+   *          the input directory of the merge/move task
+   * @return The conditional task
+   */
+  @SuppressWarnings("unchecked")
+  public static ConditionalTask createCondTask(HiveConf conf,
+      Task<? extends Serializable> currTask, MoveWork mvWork,
+      Serializable mergeWork, String inputPath) {
+
+    // There are 3 options for this ConditionalTask:
+    // 1) Merge the partitions
+    // 2) Move the partitions (i.e. don't merge the partitions)
+    // 3) Merge some partitions and move other partitions (i.e. merge some partitions and don't
+    // merge others) in this case the merge is done first followed by the move to prevent
+    // conflicts.
+    Task<? extends Serializable> mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf);
+    Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(mvWork, conf);
+    Task<? extends Serializable> mergeAndMoveMergeTask = TaskFactory.get(mergeWork, conf);
+    Task<? extends Serializable> mergeAndMoveMoveTask = TaskFactory.get(mvWork, conf);
+
+    // NOTE! It is necessary merge task is the parent of the move task, and not
+    // the other way around, for the proper execution of the execute method of
+    // ConditionalTask
+    mergeAndMoveMergeTask.addDependentTask(mergeAndMoveMoveTask);
+
+    List<Serializable> listWorks = new ArrayList<Serializable>();
+    listWorks.add(mvWork);
+    listWorks.add(mergeWork);
+
+    ConditionalWork cndWork = new ConditionalWork(listWorks);
+
+    List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
+    listTasks.add(moveOnlyMoveTask);
+    listTasks.add(mergeOnlyMergeTask);
+    listTasks.add(mergeAndMoveMergeTask);
+
+    ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, conf);
+    cndTsk.setListTasks(listTasks);
+
+    // create resolver
+    cndTsk.setResolver(new ConditionalResolverMergeFiles());
+    ConditionalResolverMergeFilesCtx mrCtx =
+        new ConditionalResolverMergeFilesCtx(listTasks, inputPath);
+    cndTsk.setResolverCtx(mrCtx);
+
+    // make the conditional task as the child of the current leaf task
+    currTask.addDependentTask(cndTsk);
+
+    return cndTsk;
+  }
+
+  /**
+   * check if it is skewed table and stored as dirs.
+   *
+   * @param fsInputDesc
+   * @return
+   */
+  public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) {
+    return (fsInputDesc.getLbCtx() == null) ? false : fsInputDesc.getLbCtx()
+        .isSkewedStoredAsDir();
+  }
+
+  public static Task<MoveWork> findMoveTask(
+      List<Task<MoveWork>> mvTasks, FileSinkOperator fsOp) {
+    // find the move task
+    for (Task<MoveWork> mvTsk : mvTasks) {
+      MoveWork mvWork = mvTsk.getWork();
+      Path srcDir = null;
+      if (mvWork.getLoadFileWork() != null) {
+        srcDir = mvWork.getLoadFileWork().getSourcePath();
+      } else if (mvWork.getLoadTableWork() != null) {
+        srcDir = mvWork.getLoadTableWork().getSourcePath();
+      }
+
+      if ((srcDir != null)
+          && (srcDir.equals(new Path(fsOp.getConf().getFinalDirName())))) {
+        return mvTsk;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Returns true iff the fsOp requires a merge
+   * @param mvTasks
+   * @param hconf
+   * @param fsOp
+   * @param currTask
+   * @param isInsertTable
+   * @return
+   */
+  public static boolean isMergeRequired(List<Task<MoveWork>> mvTasks, HiveConf hconf, FileSinkOperator fsOp,
+      Task<? extends Serializable> currTask, boolean isInsertTable) {
+
+    // Has the user enabled merging of files for map-only jobs or for all jobs
+    if ((mvTasks != null) && (!mvTasks.isEmpty())) {
+
+      // no need of merging if the move is to a local file system
+      MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTask(mvTasks, fsOp);
+
+      if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) {
+        GenMapRedUtils.addStatsTask(fsOp, mvTask, currTask, hconf);
+      }
+
+      if ((mvTask != null) && !mvTask.isLocal() && fsOp.getConf().canBeMerged()) {
+        if (fsOp.getConf().isLinkedFileSink()) {
+          // If the user has HIVEMERGEMAPREDFILES set to false, the idea was the
+          // number of reducers are few, so the number of files anyway are small.
+          // However, with this optimization, we are increasing the number of files
+          // possibly by a big margin. So, merge aggresively.
+          if (hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) ||
+              hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES)) {
+            return true;
+          }
+        } else {
+          // There are separate configuration parameters to control whether to
+          // merge for a map-only job
+          // or for a map-reduce job
+          if (currTask.getWork() instanceof MapredWork) {
+            ReduceWork reduceWork = ((MapredWork) currTask.getWork()).getReduceWork();
+            boolean mergeMapOnly =
+                hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null;
+            boolean mergeMapRed =
+                hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
+                reduceWork != null;
+            if (mergeMapOnly || mergeMapRed) {
+              return true;
+            }
+          } else {
+            return false;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Create and add any dependent move tasks
+   *
+   * @param currTask
+   * @param chDir
+   * @param fsOp
+   * @param parseCtx
+   * @param mvTasks
+   * @param hconf
+   * @param dependencyTask
+   * @return
+   */
+  public static Path createMoveTask(Task<? extends Serializable> currTask, boolean chDir,
+      FileSinkOperator fsOp, ParseContext parseCtx, List<Task<MoveWork>> mvTasks,
+      HiveConf hconf, DependencyCollectionTask dependencyTask) {
+
+    Path dest = null;
+
+    if (chDir) {
+      dest = new Path(fsOp.getConf().getFinalDirName());
+
+      // generate the temporary file
+      // it must be on the same file system as the current destination
+      Context baseCtx = parseCtx.getContext();
+      String tmpDir = baseCtx.getExternalTmpFileURI(dest.toUri());
+
+      FileSinkDesc fileSinkDesc = fsOp.getConf();
+      // Change all the linked file sink descriptors
+      if (fileSinkDesc.isLinkedFileSink()) {
+        for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {
+          String fileName = Utilities.getFileNameFromDirName(fsConf.getDirName());
+          fsConf.setParentDir(tmpDir);
+          fsConf.setDirName(tmpDir + Path.SEPARATOR + fileName);
+        }
+      } else {
+        fileSinkDesc.setDirName(tmpDir);
+      }
+    }
+
+    Task<MoveWork> mvTask = null;
+
+    if (!chDir) {
+      mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOp);
+    }
+
+    // Set the move task to be dependent on the current task
+    if (mvTask != null) {
+      GenMapRedUtils.addDependentMoveTasks(mvTask, hconf, currTask, dependencyTask);
+    }
+
+    return dest;
+  }
+
   private GenMapRedUtils() {
     // prevent instantiation
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Mon Jan  6 21:32:38 2014
@@ -286,7 +286,7 @@ public class GroupByOptimizer implements
       currOp = currOp.getParentOperators().get(0);
 
       while (true) {
-        if (currOp.getParentOperators() == null) {
+        if ((currOp.getParentOperators() == null) || (currOp.getParentOperators().isEmpty())) {
           break;
         }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Mon Jan  6 21:32:38 2014
@@ -236,13 +236,14 @@ public class MapJoinProcessor implements
    * @return the alias to the big table
    * @throws SemanticException
    */
-  public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos)
+  public static String genMapJoinOpAndLocalWork(HiveConf conf, MapredWork newWork,
+    JoinOperator op, int mapJoinPos)
       throws SemanticException {
     LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
         newWork.getMapWork().getOpParseCtxMap();
     QBJoinTree newJoinTree = newWork.getMapWork().getJoinTree();
     // generate the map join operator; already checked the map join
-    MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
+    MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(conf, opParseCtxMap, op,
         newJoinTree, mapJoinPos, true, false);
     return genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos);
   }
@@ -315,7 +316,7 @@ public class MapJoinProcessor implements
    *          are cached in memory
    * @param noCheckOuterJoin
    */
-  public static MapJoinOperator convertMapJoin(
+  public static MapJoinOperator convertMapJoin(HiveConf conf,
     LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
     JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin,
     boolean validateMapJoinTree)
@@ -372,21 +373,90 @@ public class MapJoinProcessor implements
       pos++;
     }
 
-    // get the join keys from old parent ReduceSink operators
+    // create the map-join operator
+    MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap,
+        op, joinTree, mapJoinPos, noCheckOuterJoin);
+
+
+    // remove old parents
     for (pos = 0; pos < newParentOps.size(); pos++) {
-      ReduceSinkOperator oldPar = (ReduceSinkOperator) oldReduceSinkParentOps.get(pos);
-      ReduceSinkDesc rsconf = oldPar.getConf();
+      newParentOps.get(pos).removeChild(oldReduceSinkParentOps.get(pos));
+      newParentOps.get(pos).getChildOperators().add(mapJoinOp);
+    }
+
+
+    mapJoinOp.getParentOperators().removeAll(oldReduceSinkParentOps);
+    mapJoinOp.setParentOperators(newParentOps);
+
+    // make sure only map-joins can be performed.
+    if (validateMapJoinTree) {
+      validateMapJoinTypes(mapJoinOp);
+    }
+
+    // change the children of the original join operator to point to the map
+    // join operator
+
+    return mapJoinOp;
+  }
+
+  public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf,
+      LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
+      JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
+      throws SemanticException {
+
+    JoinDesc desc = op.getConf();
+    JoinCondDesc[] condns = desc.getConds();
+    Byte[] tagOrder = desc.getTagOrder();
+
+    // outer join cannot be performed on a table which is being cached
+    if (!noCheckOuterJoin) {
+      if (checkMapJoin(mapJoinPos, condns) < 0) {
+        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+      }
+    }
+
+    Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
+
+    // Walk over all the sources (which are guaranteed to be reduce sink
+    // operators).
+    // The join outputs a concatenation of all the inputs.
+    QBJoinTree leftSrc = joinTree.getJoinSrc();
+    List<Operator<? extends OperatorDesc>> oldReduceSinkParentOps =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+    if (leftSrc != null) {
+      // assert mapJoinPos == 0;
+      Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
+      assert parentOp.getParentOperators().size() == 1;
+      oldReduceSinkParentOps.add(parentOp);
+    }
+
+
+    byte pos = 0;
+    for (String src : joinTree.getBaseSrc()) {
+      if (src != null) {
+        Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
+        assert parentOp.getParentOperators().size() == 1;
+        oldReduceSinkParentOps.add(parentOp);
+      }
+      pos++;
+    }
+
+    // get the join keys from old parent ReduceSink operators
+    for (pos = 0; pos < op.getParentOperators().size(); pos++) {
+      ReduceSinkOperator parent = (ReduceSinkOperator) oldReduceSinkParentOps.get(pos);
+      ReduceSinkDesc rsconf = parent.getConf();
       List<ExprNodeDesc> keys = rsconf.getKeyCols();
       keyExprMap.put(pos, keys);
     }
 
-    // removing RS, only ExprNodeDesc is changed (key/value/filter exprs and colExprMap)
-    // others (output column-name, RR, schema) remain intact
-    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
-    List<String> outputColumnNames = op.getConf().getOutputColumnNames();
+    List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0));
+    StringBuilder keyOrder = new StringBuilder();
+    for (int i = 0; i < keyCols.size(); i++) {
+      keyOrder.append("+");
+    }
 
+    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
     List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
-
     Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
     Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
     for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
@@ -410,45 +480,12 @@ public class MapJoinProcessor implements
       }
     }
 
-    Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
-    Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
-    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
-      byte srcTag = entry.getKey();
-      List<ExprNodeDesc> filter = entry.getValue();
-
-      Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
-      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
-    }
-    desc.setFilters(filters = newFilters);
-
-    // remove old parents
-    for (pos = 0; pos < newParentOps.size(); pos++) {
-      newParentOps.get(pos).removeChild(oldReduceSinkParentOps.get(pos));
-    }
-
-    JoinCondDesc[] joinCondns = op.getConf().getConds();
-
-    Operator[] newPar = new Operator[newParentOps.size()];
-    pos = 0;
-    for (Operator<? extends OperatorDesc> o : newParentOps) {
-      newPar[pos++] = o;
-    }
-
-    List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0));
-    StringBuilder keyOrder = new StringBuilder();
-    for (int i = 0; i < keyCols.size(); i++) {
-      keyOrder.append("+");
-    }
-
-    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils
-        .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
-
+    // construct valueTableDescs and valueFilteredTableDescs
     List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
     List<TableDesc> valueFiltedTableDescs = new ArrayList<TableDesc>();
-
     int[][] filterMap = desc.getFilterMap();
-    for (pos = 0; pos < newParentOps.size(); pos++) {
-      List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
+    for (pos = 0; pos < op.getParentOperators().size(); pos++) {
+      List<ExprNodeDesc> valueCols = newValueExprs.get(Byte.valueOf((byte) pos));
       int length = valueCols.size();
       List<ExprNodeDesc> valueFilteredCols = new ArrayList<ExprNodeDesc>(length);
       // deep copy expr node desc
@@ -475,6 +512,19 @@ public class MapJoinProcessor implements
       valueTableDescs.add(valueTableDesc);
       valueFiltedTableDescs.add(valueFilteredTableDesc);
     }
+
+    Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
+    Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
+    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
+      byte srcTag = entry.getKey();
+      List<ExprNodeDesc> filter = entry.getValue();
+
+      Operator<?> terminal = op.getParentOperators().get(srcTag);
+      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
+    }
+    desc.setFilters(filters = newFilters);
+
+    // create dumpfile prefix needed to create descriptor
     String dumpFilePrefix = "";
     if( joinTree.getMapAliases() != null ) {
       for(String mapAlias : joinTree.getMapAliases()) {
@@ -484,15 +534,24 @@ public class MapJoinProcessor implements
     } else {
       dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix();
     }
+
+    List<String> outputColumnNames = op.getConf().getOutputColumnNames();
+    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf,
+        PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
+    JoinCondDesc[] joinCondns = op.getConf().getConds();
     MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs,
         valueTableDescs, valueFiltedTableDescs, outputColumnNames, mapJoinPos, joinCondns,
         filters, op.getConf().getNoOuterJoin(), dumpFilePrefix);
+    mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
     mapJoinDescriptor.setTagOrder(tagOrder);
     mapJoinDescriptor.setNullSafes(desc.getNullSafes());
     mapJoinDescriptor.setFilterMap(desc.getFilterMap());
 
+    // reduce sink row resolver used to generate map join op
+    RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
+
     MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(
-        mapJoinDescriptor, new RowSchema(outputRS.getColumnInfos()), newPar);
+        mapJoinDescriptor, new RowSchema(outputRS.getColumnInfos()), op.getParentOperators());
 
     OpParseContext ctx = new OpParseContext(outputRS);
     opParseCtxMap.put(mapJoinOp, ctx);
@@ -500,24 +559,17 @@ public class MapJoinProcessor implements
     mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs());
     mapJoinOp.setColumnExprMap(colExprMap);
 
-    // change the children of the original join operator to point to the map
-    // join operator
     List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators();
     for (Operator<? extends OperatorDesc> childOp : childOps) {
       childOp.replaceParent(op, mapJoinOp);
     }
 
     mapJoinOp.setChildOperators(childOps);
-    mapJoinOp.setParentOperators(newParentOps);
     op.setChildOperators(null);
     op.setParentOperators(null);
 
-    // make sure only map-joins can be performed.
-    if (validateMapJoinTree) {
-      validateMapJoinTypes(mapJoinOp);
-    }
-
     return mapJoinOp;
+
   }
 
   /**
@@ -533,14 +585,14 @@ public class MapJoinProcessor implements
    *          are cached in memory
    * @param noCheckOuterJoin
    */
-  public static MapJoinOperator convertSMBJoinToMapJoin(
+  public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf,
     Map<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
     SMBMapJoinOperator smbJoinOp, QBJoinTree joinTree, int bigTablePos, boolean noCheckOuterJoin)
     throws SemanticException {
     // Create a new map join operator
     SMBJoinDesc smbJoinDesc = smbJoinOp.getConf();
     List<ExprNodeDesc> keyCols = smbJoinDesc.getKeys().get(Byte.valueOf((byte) 0));
-    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils
+    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf, PlanUtils
         .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
     MapJoinDesc mapJoinDesc = new MapJoinDesc(smbJoinDesc.getKeys(),
         keyTableDesc, smbJoinDesc.getExprs(),
@@ -549,6 +601,8 @@ public class MapJoinProcessor implements
         bigTablePos, smbJoinDesc.getConds(),
         smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix());
 
+    mapJoinDesc.setStatistics(smbJoinDesc.getStatistics());
+
     RowResolver joinRS = opParseCtxMap.get(smbJoinOp).getRowResolver();
     // The mapjoin has the same schema as the join operator
     MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(
@@ -588,8 +642,8 @@ public class MapJoinProcessor implements
 
     LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap = pctx
         .getOpParseCtx();
-    MapJoinOperator mapJoinOp = convertMapJoin(opParseCtxMap, op, joinTree, mapJoinPos,
-        noCheckOuterJoin, true);
+    MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), opParseCtxMap, op,
+        joinTree, mapJoinPos, noCheckOuterJoin, true);
     // create a dummy select to select all columns
     genSelectPlan(pctx, mapJoinOp);
     return mapJoinOp;
@@ -609,7 +663,7 @@ public class MapJoinProcessor implements
    * If see a right outer join, set lastSeenRightOuterJoin to true, clear the
    * bigTableCandidates, and add right side to the bigTableCandidates, it means
    * the right side of a right outer join always win. If see a full outer join,
-   * return null immediately (no one can be the big table, can not do a
+   * return empty set immediately (no one can be the big table, can not do a
    * mapjoin).
    *
    *
@@ -635,7 +689,8 @@ public class MapJoinProcessor implements
         // changed in future, these 2 are not missing.
         seenOuterJoin = true;
         lastSeenRightOuterJoin = false;
-        return null;
+        // empty set - cannot convert
+        return new HashSet<Integer>();
       } else if (joinType == JoinDesc.LEFT_OUTER_JOIN
           || joinType == JoinDesc.LEFT_SEMI_JOIN) {
         seenOuterJoin = true;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java Mon Jan  6 21:32:38 2014
@@ -398,7 +398,8 @@ public class SkewJoinOptimizer implement
             return parseContext.getTopToTable().get(tsOp);
           }
         }
-        if ((op.getParentOperators() == null) || (op.getParentOperators().size() > 1)) {
+        if ((op.getParentOperators() == null) || (op.getParentOperators().isEmpty()) || 
+            (op.getParentOperators().size() > 1)) {
           return null;
         }
         op = op.getParentOperators().get(0);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java Mon Jan  6 21:32:38 2014
@@ -168,7 +168,7 @@ public class CorrelationOptimizer implem
       int numAliases = order.length;
       Set<Integer> bigTableCandidates =
           MapJoinProcessor.getBigTableCandidates(joinDesc.getConds());
-      if (bigTableCandidates == null) {
+      if (bigTableCandidates.isEmpty()) {
         continue;
       }
 
@@ -346,7 +346,7 @@ public class CorrelationOptimizer implem
             "involved in this operator");
         return correlatedReduceSinkOperators;
       }
-      if (current.getParentOperators() == null) {
+      if ((current.getParentOperators() == null) || (current.getParentOperators().isEmpty())) {
         return correlatedReduceSinkOperators;
       }
       if (current instanceof PTFOperator) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Mon Jan  6 21:32:38 2014
@@ -189,7 +189,8 @@ public class CommonJoinTaskDispatcher ex
 
     // optimize this newWork given the big table position
     String bigTableAlias =
-        MapJoinProcessor.genMapJoinOpAndLocalWork(newWork, newJoinOp, bigTablePosition);
+        MapJoinProcessor.genMapJoinOpAndLocalWork(physicalContext.getParseContext().getConf(),
+            newWork, newJoinOp, bigTablePosition);
     return new ObjectPair<MapRedTask, String>(newTask, bigTableAlias);
   }
 
@@ -434,7 +435,7 @@ public class CommonJoinTaskDispatcher ex
           .getConds());
 
       // no table could be the big table; there is no need to convert
-      if (bigTableCandidates == null) {
+      if (bigTableCandidates.isEmpty()) {
         return null;
       }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java Mon Jan  6 21:32:38 2014
@@ -205,7 +205,7 @@ public class SortMergeJoinTaskDispatcher
 
     Operator<? extends OperatorDesc> currOp = originalSMBJoinOp;
     while (true) {
-      if (currOp.getChildOperators() == null) {
+      if ((currOp.getChildOperators() == null) || (currOp.getChildOperators().isEmpty())) {
         if (currOp instanceof FileSinkOperator) {
           FileSinkOperator fsOp = (FileSinkOperator)currOp;
           // The query has enforced that a sort-merge join should be performed.
@@ -433,7 +433,8 @@ public class SortMergeJoinTaskDispatcher
     opParseContextMap.put(newSMBJoinOp, opParseContextMap.get(oldSMBJoinOp));
 
     // generate the map join operator
-    return MapJoinProcessor.convertSMBJoinToMapJoin(opParseContextMap, newSMBJoinOp,
+    return MapJoinProcessor.convertSMBJoinToMapJoin(physicalContext.getConf(),
+        opParseContextMap, newSMBJoinOp,
         joinTree, mapJoinPos, true);
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java Mon Jan  6 21:32:38 2014
@@ -22,10 +22,10 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,21 +33,14 @@ import org.apache.hadoop.fs.ContentSumma
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.ColumnStatsTask;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.StatsTask;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
@@ -61,9 +54,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
 import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext;
@@ -73,302 +63,25 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3;
 import org.apache.hadoop.hive.ql.optimizer.GenMRTableScan1;
 import org.apache.hadoop.hive.ql.optimizer.GenMRUnion1;
-import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
-import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
-import org.apache.hadoop.hive.ql.plan.DDLWork;
-import org.apache.hadoop.hive.ql.plan.FetchWork;
-import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.shims.ShimLoader;
 
-public class MapReduceCompiler {
+public class MapReduceCompiler extends TaskCompiler {
 
   protected final Log LOG = LogFactory.getLog(MapReduceCompiler.class);
-  private Hive db;
-  protected LogHelper console;
-  private HiveConf conf;
-
 
   public MapReduceCompiler() {
   }
 
-  public void init(HiveConf conf, LogHelper console, Hive db) {
-    this.conf = conf;
-    this.db = db;
-    this.console = console;
-  }
-
-  @SuppressWarnings({"nls", "unchecked"})
-  public void compile(final ParseContext pCtx, final List<Task<? extends Serializable>> rootTasks,
-      final HashSet<ReadEntity> inputs, final HashSet<WriteEntity> outputs) throws SemanticException {
-
-    Context ctx = pCtx.getContext();
-    GlobalLimitCtx globalLimitCtx = pCtx.getGlobalLimitCtx();
-    QB qb = pCtx.getQB();
-    List<Task<MoveWork>> mvTask = new ArrayList<Task<MoveWork>>();
-
-    List<LoadTableDesc> loadTableWork = pCtx.getLoadTableWork();
-    List<LoadFileDesc> loadFileWork = pCtx.getLoadFileWork();
-
-    boolean isCStats = qb.isAnalyzeRewrite();
-
-    if (pCtx.getFetchTask() != null) {
-      return;
-    }
-
-    /*
-     * In case of a select, use a fetch task instead of a move task.
-     * If the select is from analyze table column rewrite, don't create a fetch task. Instead create
-     * a column stats task later.
-     */
-    if (pCtx.getQB().getIsQuery() && !isCStats) {
-      if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) {
-        throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg());
-      }
-
-      LoadFileDesc loadFileDesc = loadFileWork.get(0);
-
-      String cols = loadFileDesc.getColumns();
-      String colTypes = loadFileDesc.getColumnTypes();
-
-      TableDesc resultTab = pCtx.getFetchTabledesc();
-      if (resultTab == null) {
-        String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
-        resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
-      }
-
-      FetchWork fetch = new FetchWork(loadFileDesc.getSourcePath(),
-          resultTab, qb.getParseInfo().getOuterQueryLimit());
-      fetch.setSource(pCtx.getFetchSource());
-      fetch.setSink(pCtx.getFetchSink());
-
-      pCtx.setFetchTask((FetchTask) TaskFactory.get(fetch, conf));
-
-      // For the FetchTask, the limit optimization requires we fetch all the rows
-      // in memory and count how many rows we get. It's not practical if the
-      // limit factor is too big
-      int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH);
-      if (globalLimitCtx.isEnable() && globalLimitCtx.getGlobalLimit() > fetchLimit) {
-        LOG.info("For FetchTask, LIMIT " + globalLimitCtx.getGlobalLimit() + " > " + fetchLimit
-            + ". Doesn't qualify limit optimiztion.");
-        globalLimitCtx.disableOpt();
-      }
-    } else if (!isCStats) {
-      for (LoadTableDesc ltd : loadTableWork) {
-        Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
-        mvTask.add(tsk);
-        // Check to see if we are stale'ing any indexes and auto-update them if we want
-        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) {
-          IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf);
-          try {
-            List<Task<? extends Serializable>> indexUpdateTasks = indexUpdater
-                .generateUpdateTasks();
-            for (Task<? extends Serializable> updateTask : indexUpdateTasks) {
-              tsk.addDependentTask(updateTask);
-            }
-          } catch (HiveException e) {
-            console
-                .printInfo("WARNING: could not auto-update stale indexes, which are not in sync");
-          }
-        }
-      }
-
-      boolean oneLoadFile = true;
-      for (LoadFileDesc lfd : loadFileWork) {
-        if (qb.isCTAS()) {
-          assert (oneLoadFile); // should not have more than 1 load file for
-          // CTAS
-          // make the movetask's destination directory the table's destination.
-          Path location;
-          String loc = qb.getTableDesc().getLocation();
-          if (loc == null) {
-            // get the table's default location
-            Table dumpTable;
-            Path targetPath;
-            try {
-              dumpTable = db.newTable(qb.getTableDesc().getTableName());
-              if (!db.databaseExists(dumpTable.getDbName())) {
-                throw new SemanticException("ERROR: The database " + dumpTable.getDbName()
-                    + " does not exist.");
-              }
-              Warehouse wh = new Warehouse(conf);
-              targetPath = wh.getTablePath(db.getDatabase(dumpTable.getDbName()), dumpTable
-                  .getTableName());
-            } catch (HiveException e) {
-              throw new SemanticException(e);
-            } catch (MetaException e) {
-              throw new SemanticException(e);
-            }
-
-            location = targetPath;
-          } else {
-              location = new Path(loc);
-          }
-          lfd.setTargetDir(location);
-
-          oneLoadFile = false;
-        }
-        mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), conf));
-      }
-    }
-
-    // generate map reduce plans
-    ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
-    GenMRProcContext procCtx = new GenMRProcContext(
-        conf,
-        new HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>>(),
-        tempParseContext, mvTask, rootTasks,
-        new LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx>(),
-        inputs, outputs);
-
-    // create a walker which walks the tree in a DFS manner while maintaining
-    // the operator stack.
-    // The dispatcher generates the plan from the operator tree
-    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    opRules.put(new RuleRegExp(new String("R1"),
-        TableScanOperator.getOperatorName() + "%"),
-        new GenMRTableScan1());
-    opRules.put(new RuleRegExp(new String("R2"),
-        TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
-        new GenMRRedSink1());
-    opRules.put(new RuleRegExp(new String("R3"),
-        ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
-        new GenMRRedSink2());
-    opRules.put(new RuleRegExp(new String("R4"),
-        FileSinkOperator.getOperatorName() + "%"),
-        new GenMRFileSink1());
-    opRules.put(new RuleRegExp(new String("R5"),
-        UnionOperator.getOperatorName() + "%"),
-        new GenMRUnion1());
-    opRules.put(new RuleRegExp(new String("R6"),
-        UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
-        new GenMRRedSink3());
-    opRules.put(new RuleRegExp(new String("R7"),
-        MapJoinOperator.getOperatorName() + "%"),
-        MapJoinFactory.getTableScanMapJoin());
-
-    // The dispatcher fires the processor corresponding to the closest matching
-    // rule and passes the context along
-    Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules,
-        procCtx);
-
-    GraphWalker ogw = new GenMapRedWalker(disp);
-    ArrayList<Node> topNodes = new ArrayList<Node>();
-    topNodes.addAll(pCtx.getTopOps().values());
-    ogw.startWalking(topNodes, null);
-
-    /*
-     * If the query was the result of analyze table column compute statistics rewrite, create
-     * a column stats task instead of a fetch task to persist stats to the metastore.
-     */
-    if (isCStats) {
-      genColumnStatsTask(qb, loadTableWork, loadFileWork, rootTasks);
-    }
-
-    // reduce sink does not have any kids - since the plan by now has been
-    // broken up into multiple
-    // tasks, iterate over all tasks.
-    // For each task, go over all operators recursively
-    for (Task<? extends Serializable> rootTask : rootTasks) {
-      breakTaskTree(rootTask);
-    }
-
-    // For each task, set the key descriptor for the reducer
-    for (Task<? extends Serializable> rootTask : rootTasks) {
-      GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask);
-    }
-
-    // If a task contains an operator which instructs bucketizedhiveinputformat
-    // to be used, please do so
-    for (Task<? extends Serializable> rootTask : rootTasks) {
-      setInputFormat(rootTask);
-    }
-
-    PhysicalContext physicalContext = new PhysicalContext(conf,
-        getParseContext(pCtx, rootTasks), ctx, rootTasks, pCtx.getFetchTask());
-    PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
-        physicalContext, conf);
-    physicalOptimizer.optimize();
-
-    decideExecMode(rootTasks, ctx, globalLimitCtx);
-
-    if (qb.isCTAS()) {
-      // generate a DDL task and make it a dependent task of the leaf
-      CreateTableDesc crtTblDesc = qb.getTableDesc();
-
-      crtTblDesc.validate();
-
-      // Clear the output for CTAS since we don't need the output from the
-      // mapredWork, the
-      // DDLWork at the tail of the chain will have the output
-      outputs.clear();
-
-      Task<? extends Serializable> crtTblTask = TaskFactory.get(new DDLWork(
-          inputs, outputs, crtTblDesc), conf);
-
-      // find all leaf tasks and make the DDLTask as a dependent task of all of
-      // them
-      HashSet<Task<? extends Serializable>> leaves = new HashSet<Task<? extends Serializable>>();
-      getLeafTasks(rootTasks, leaves);
-      assert (leaves.size() > 0);
-      for (Task<? extends Serializable> task : leaves) {
-        if (task instanceof StatsTask) {
-          // StatsTask require table to already exist
-          for (Task<? extends Serializable> parentOfStatsTask : task.getParentTasks()) {
-            parentOfStatsTask.addDependentTask(crtTblTask);
-          }
-          for (Task<? extends Serializable> parentOfCrtTblTask : crtTblTask.getParentTasks()) {
-            parentOfCrtTblTask.removeDependentTask(task);
-          }
-          crtTblTask.addDependentTask(task);
-        } else {
-          task.addDependentTask(crtTblTask);
-        }
-      }
-    }
-
-    if (globalLimitCtx.isEnable() && pCtx.getFetchTask() != null) {
-      LOG.info("set least row check for FetchTask: " + globalLimitCtx.getGlobalLimit());
-      pCtx.getFetchTask().getWork().setLeastNumRows(globalLimitCtx.getGlobalLimit());
-    }
-
-    if (globalLimitCtx.isEnable() && globalLimitCtx.getLastReduceLimitDesc() != null) {
-      LOG.info("set least row check for LimitDesc: " + globalLimitCtx.getGlobalLimit());
-      globalLimitCtx.getLastReduceLimitDesc().setLeastRows(globalLimitCtx.getGlobalLimit());
-      List<ExecDriver> mrTasks = Utilities.getMRTasks(rootTasks);
-      for (ExecDriver tsk : mrTasks) {
-        tsk.setRetryCmdWhenFail(true);
-      }
-    }
-  }
-
-  private void setInputFormat(MapWork work, Operator<? extends OperatorDesc> op) {
-    if (op.isUseBucketizedHiveInputFormat()) {
-      work.setUseBucketizedHiveInputFormat(true);
-      return;
-    }
-
-    if (op.getChildOperators() != null) {
-      for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
-        setInputFormat(work, childOp);
-      }
-    }
-  }
-
   // loop over all the tasks recursively
-  private void setInputFormat(Task<? extends Serializable> task) {
+  @Override
+  protected void setInputFormat(Task<? extends Serializable> task) {
     if (task instanceof ExecDriver) {
       MapWork work = ((MapredWork) task.getWork()).getMapWork();
       HashMap<String, Operator<? extends OperatorDesc>> opMap = work.getAliasToWork();
@@ -392,6 +105,20 @@ public class MapReduceCompiler {
     }
   }
 
+  private void setInputFormat(MapWork work, Operator<? extends OperatorDesc> op) {
+    if (op.isUseBucketizedHiveInputFormat()) {
+      work.setUseBucketizedHiveInputFormat(true);
+      return;
+    }
+
+    if (op.getChildOperators() != null) {
+      for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
+        setInputFormat(work, childOp);
+      }
+    }
+  }
+
+  @Override
   public ParseContext getParseContext(ParseContext pCtx, List<Task<? extends Serializable>> rootTasks) {
     return new ParseContext(conf, pCtx.getQB(), pCtx.getParseTree(),
         pCtx.getOpToPartPruner(), pCtx.getOpToPartList(), pCtx.getTopOps(),
@@ -452,67 +179,6 @@ public class MapReduceCompiler {
   }
 
   /**
-   * A helper function to generate a column stats task on top of map-red task. The column stats
-   * task fetches from the output of the map-red task, constructs the column stats object and
-   * persists it to the metastore.
-   *
-   * This method generates a plan with a column stats task on top of map-red task and sets up the
-   * appropriate metadata to be used during execution.
-   *
-   * @param qb
-   */
-  @SuppressWarnings("unchecked")
-  private void genColumnStatsTask(QB qb, List<LoadTableDesc> loadTableWork,
-      List<LoadFileDesc> loadFileWork, List<Task<? extends Serializable>> rootTasks) {
-    QBParseInfo qbParseInfo = qb.getParseInfo();
-    ColumnStatsTask cStatsTask = null;
-    ColumnStatsWork cStatsWork = null;
-    FetchWork fetch = null;
-    String tableName = qbParseInfo.getTableName();
-    String partName = qbParseInfo.getPartName();
-    List<String> colName = qbParseInfo.getColName();
-    List<String> colType = qbParseInfo.getColType();
-    boolean isTblLevel = qbParseInfo.isTblLvl();
-
-    String cols = loadFileWork.get(0).getColumns();
-    String colTypes = loadFileWork.get(0).getColumnTypes();
-
-    String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
-    TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
-
-    fetch = new FetchWork(loadFileWork.get(0).getSourcePath(),
-        resultTab, qb.getParseInfo().getOuterQueryLimit());
-
-    ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName, partName,
-        colName, colType, isTblLevel);
-    cStatsWork = new ColumnStatsWork(fetch, cStatsDesc);
-    cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf);
-    rootTasks.add(cStatsTask);
-  }
-
-  /**
-   * Find all leaf tasks of the list of root tasks.
-   */
-  private void getLeafTasks(List<Task<? extends Serializable>> rootTasks,
-      HashSet<Task<? extends Serializable>> leaves) {
-
-    for (Task<? extends Serializable> root : rootTasks) {
-      getLeafTasks(root, leaves);
-    }
-  }
-
-  private void getLeafTasks(Task<? extends Serializable> task,
-      HashSet<Task<? extends Serializable>> leaves) {
-    if (task.getDependentTasks() == null) {
-      if (!leaves.contains(task)) {
-        leaves.add(task);
-      }
-    } else {
-      getLeafTasks(task.getDependentTasks(), leaves);
-    }
-  }
-
-  /**
    * Make a best guess at trying to find the number of reducers
    */
   private static int getNumberOfReducers(MapredWork mrwork, HiveConf conf) {
@@ -527,7 +193,8 @@ public class MapReduceCompiler {
     return conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
   }
 
-  private void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx,
+  @Override
+  protected void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx,
       GlobalLimitCtx globalLimitCtx)
       throws SemanticException {
 
@@ -603,4 +270,74 @@ public class MapReduceCompiler {
       console.printInfo("Automatically selecting local only mode for query");
     }
   }
+
+  @Override
+  protected void optimizeTaskPlan(List<Task<? extends Serializable>> rootTasks,
+      ParseContext pCtx, Context ctx) throws SemanticException {
+    // reduce sink does not have any kids - since the plan by now has been
+    // broken up into multiple
+    // tasks, iterate over all tasks.
+    // For each task, go over all operators recursively
+    for (Task<? extends Serializable> rootTask : rootTasks) {
+      breakTaskTree(rootTask);
+    }
+
+
+    PhysicalContext physicalContext = new PhysicalContext(conf,
+        getParseContext(pCtx, rootTasks), ctx, rootTasks, pCtx.getFetchTask());
+    PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
+        physicalContext, conf);
+    physicalOptimizer.optimize();
+
+  }
+
+  @Override
+  protected void generateTaskTree(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx,
+      List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {
+
+    // generate map reduce plans
+    ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
+    GenMRProcContext procCtx = new GenMRProcContext(
+        conf,
+        new HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>>(),
+        tempParseContext, mvTask, rootTasks,
+        new LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx>(),
+        inputs, outputs);
+
+    // create a walker which walks the tree in a DFS manner while maintaining
+    // the operator stack.
+    // The dispatcher generates the plan from the operator tree
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp(new String("R1"),
+        TableScanOperator.getOperatorName() + "%"),
+        new GenMRTableScan1());
+    opRules.put(new RuleRegExp(new String("R2"),
+        TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
+        new GenMRRedSink1());
+    opRules.put(new RuleRegExp(new String("R3"),
+        ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
+        new GenMRRedSink2());
+    opRules.put(new RuleRegExp(new String("R4"),
+        FileSinkOperator.getOperatorName() + "%"),
+        new GenMRFileSink1());
+    opRules.put(new RuleRegExp(new String("R5"),
+        UnionOperator.getOperatorName() + "%"),
+        new GenMRUnion1());
+    opRules.put(new RuleRegExp(new String("R6"),
+        UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
+        new GenMRRedSink3());
+    opRules.put(new RuleRegExp(new String("R7"),
+        MapJoinOperator.getOperatorName() + "%"),
+        MapJoinFactory.getTableScanMapJoin());
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules,
+        procCtx);
+
+    GraphWalker ogw = new GenMapRedWalker(disp);
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pCtx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Mon Jan  6 21:32:38 2014
@@ -8958,7 +8958,7 @@ public class SemanticAnalyzer extends Ba
     if (!ctx.getExplainLogical()) {
       // At this point we have the complete operator tree
       // from which we want to create the map-reduce plan
-      MapReduceCompiler compiler = new MapReduceCompiler();
+      TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx);
       compiler.init(conf, console, db);
       compiler.compile(pCtx, rootTasks, inputs, outputs);
       fetchTask = pCtx.getFetchTask();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java Mon Jan  6 21:32:38 2014
@@ -239,7 +239,7 @@ public class TableAccessAnalyzer {
     // and filters.
     while (true) {
       parentOps = currOp.getParentOperators();
-      if (parentOps == null) {
+      if ((parentOps == null) || (parentOps.isEmpty())) {
         return (TableScanOperator) currOp;
       }