You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/09/08 01:52:50 UTC

[30/38] hive git commit: HIVE-14637 : edit or split MoveTask to commit job results to metastore (Sergey Shelukhin)

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 4e44d49..bb7001a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1305,6 +1305,7 @@ public final class GenMapRedUtils {
     //
     // 2. Constructing a conditional task consisting of a move task and a map reduce task
     //
+    // TODO# movetask is created here; handle MM tables
     MoveWork dummyMv = new MoveWork(null, null, null,
          new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false);
     MapWork cplan;

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
index ee67443..e2887fd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
@@ -71,6 +71,7 @@ public class AnnotateRunTimeStatsOptimizer implements PhysicalPlanResolver {
       Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
       Set<Operator<? extends OperatorDesc>> ops = new HashSet<>();
 
+      /* TODO# wtf
       if (currTask instanceof MapRedTask) {
         MapRedTask mr = (MapRedTask) currTask;
         ops.addAll(mr.getWork().getAllOperators());
@@ -84,7 +85,7 @@ public class AnnotateRunTimeStatsOptimizer implements PhysicalPlanResolver {
         for (BaseWork w : sparkWork.getAllWork()) {
           ops.addAll(w.getAllOperators());
         }
-      }
+      }*/
 
       setOrAnnotateStats(ops, physicalContext.getParseContext());
       return null;

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 9329e00..d5a6d2e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -1056,6 +1056,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
         LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
             partSpec == null ? new HashMap<String, String>() : partSpec);
         ltd.setLbCtx(lbCtx);
+        // TODO# movetask is created here; handle MM tables
         Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false),
             conf);
         truncateTask.addDependentTask(moveTsk);
@@ -1668,6 +1669,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
       LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
           partSpec == null ? new HashMap<String, String>() : partSpec);
       ltd.setLbCtx(lbCtx);
+      // TODO# movetask is created here; handle MM tables
       Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false),
           conf);
       mergeTask.addDependentTask(moveTsk);

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index d562ddf..6cefbfc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -326,6 +326,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
         Utilities.getTableDesc(table), new TreeMap<String, String>(),
         replace);
+    // TODO# movetask is created here; handle MM tables
     Task<?> loadTableTask = TaskFactory.get(new MoveWork(getInputs(),
         getOutputs(), loadTableWork, null, false), conf);
     copyTask.addDependentTask(loadTableTask);
@@ -400,6 +401,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           Utilities.getTableDesc(table),
           partSpec.getPartSpec(), true);
       loadTableWork.setInheritTableSpecs(false);
+      // TODO# movetask is created here; handle MM tables
       Task<?> loadPartTask = TaskFactory.get(new MoveWork(
           getInputs(), getOutputs(), loadTableWork, null, false),
           conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index a49b813..9c9e6fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -270,6 +270,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
       loadTableWork.setInheritTableSpecs(false);
     }
 
+    // TODO# movetask is created here; handle MM tables
     Task<? extends Serializable> childTask = TaskFactory.get(new MoveWork(getInputs(),
         getOutputs(), loadTableWork, null, true, isLocal), conf);
     if (rTask != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index 4353d3a..92ad50d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -331,14 +331,6 @@ public class ParseContext {
   }
 
   /**
-   * @param loadTableWork
-   *          the loadTableWork to set
-   */
-  public void setLoadTableWork(List<LoadTableDesc> loadTableWork) {
-    this.loadTableWork = loadTableWork;
-  }
-
-  /**
    * @return the loadFileWork
    */
   public List<LoadFileDesc> getLoadFileWork() {

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 499530e..b550235 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -267,7 +267,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
   private final Map<JoinOperator, QBJoinTree> joinContext;
   private final Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext;
   private final HashMap<TableScanOperator, Table> topToTable;
-  private final Map<FileSinkOperator, Table> fsopToTable;
   private final List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting;
   private final HashMap<TableScanOperator, Map<String, String>> topToTableProps;
   private QB qb;
@@ -367,7 +366,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     smbMapJoinContext = new HashMap<SMBMapJoinOperator, QBJoinTree>();
     // Must be deterministic order map for consistent q-test output across Java versions
     topToTable = new LinkedHashMap<TableScanOperator, Table>();
-    fsopToTable = new HashMap<FileSinkOperator, Table>();
     reduceSinkOperatorsAddedByEnforceBucketingSorting = new ArrayList<ReduceSinkOperator>();
     topToTableProps = new HashMap<TableScanOperator, Map<String, String>>();
     destTableId = 1;
@@ -426,7 +424,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     opToPartToSkewedPruner.clear();
     opToSamplePruner.clear();
     nameToSplitSample.clear();
-    fsopToTable.clear();
     resultSchema = null;
     createVwDesc = null;
     viewsExpanded = null;
@@ -6547,6 +6544,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     ListBucketingCtx lbCtx = null;
     Map<String, String> partSpec = null;
     boolean isMmTable = false;
+    Long mmWriteId = null;
 
     switch (dest_type.intValue()) {
     case QBMetaData.DEST_TABLE: {
@@ -6570,7 +6568,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       }
 
       boolean isNonNativeTable = dest_tab.isNonNative();
-      isMmTable = isMmTable(dest_tab);
+      isMmTable = AcidUtils.isMmTable(dest_tab);
       if (isNonNativeTable || isMmTable) {
         queryTmpdir = dest_path;
       } else {
@@ -6603,9 +6601,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
           acidOp = getAcidType(table_desc.getOutputFileFormatClass());
           checkAcidConstraints(qb, table_desc, dest_tab);
         }
+        try {
+          mmWriteId = getMmWriteId(dest_tab, isMmTable);
+        } catch (HiveException e) {
+          throw new SemanticException(e);
+        }
         boolean isReplace = !qb.getParseInfo().isInsertIntoTable(
             dest_tab.getDbName(), dest_tab.getTableName());
-        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, isMmTable);
+        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, mmWriteId);
         ltd.setLbCtx(lbCtx);
         loadTableWork.add(ltd);
       } else {
@@ -6638,7 +6641,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
           .getAuthority(), partPath.toUri().getPath());
 
-      isMmTable = isMmTable(dest_tab);
+      isMmTable = AcidUtils.isMmTable(dest_tab);
       queryTmpdir = isMmTable ? dest_path : ctx.getTempDirForPath(dest_path);
       Utilities.LOG14535.info("createFS for partition specifying " + queryTmpdir + " from " + dest_path);
       table_desc = Utilities.getTableDesc(dest_tab);
@@ -6658,7 +6661,13 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         acidOp = getAcidType(table_desc.getOutputFileFormatClass());
         checkAcidConstraints(qb, table_desc, dest_tab);
       }
-      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, isMmTable);
+      try {
+        mmWriteId = getMmWriteId(dest_tab, isMmTable);
+      } catch (HiveException e) {
+        // How is this a semantic exception? Stupid Java and signatures.
+        throw new SemanticException(e);
+      }
+      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, mmWriteId);
       ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
           dest_tab.getTableName()));
       ltd.setLbCtx(lbCtx);
@@ -6856,13 +6865,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx);
     }
 
+    assert isMmTable == (mmWriteId != null);
     FileSinkDesc fileSinkDesc = createFileSinkDesc(table_desc, dest_part,
         dest_path, currentTableId, destTableIsAcid, destTableIsTemporary,
         destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
-        canBeMerged, isMmTable);
-    if (isMmTable) {
-      fileSinkDesc.setExecutionPrefix(ctx.getExecutionPrefix());
-    }
+        canBeMerged, mmWriteId);
 
     Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
         fileSinkDesc, fsRS, input), inputRR);
@@ -6876,7 +6883,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
     FileSinkOperator fso = (FileSinkOperator) output;
     fso.getConf().setTable(dest_tab);
-    fsopToTable.put(fso, dest_tab);
     // the following code is used to collect column stats when
     // hive.stats.autogather=true
     // and it is an insert overwrite or insert into table
@@ -6895,10 +6901,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     return output;
   }
 
-  private static boolean isMmTable(Table table) {
-    // TODO: perhaps it should be a 3rd value for 'transactional'?
-    String value = table.getProperty(hive_metastoreConstants.TABLE_IS_MM);
-    return value != null && value.equalsIgnoreCase("true");
+  private static Long getMmWriteId(Table tbl, boolean isMmTable) throws HiveException {
+    if (!isMmTable) return null;
+    // Get the next write ID for this table. We will prefix files with this write ID.
+    return Hive.get().getNextTableWriteId(tbl.getDbName(), tbl.getTableName());
   }
 
   private FileSinkDesc createFileSinkDesc(TableDesc table_desc,
@@ -6906,7 +6912,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       boolean destTableIsAcid, boolean destTableIsTemporary,
       boolean destTableIsMaterialization, Path queryTmpdir,
       SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx,
-      RowSchema fsRS, boolean canBeMerged, boolean isMmTable) throws SemanticException {
+      RowSchema fsRS, boolean canBeMerged, Long mmWriteId) throws SemanticException {
     FileSinkDesc fileSinkDesc = new FileSinkDesc(
       queryTmpdir,
       table_desc,
@@ -6919,7 +6925,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       rsCtx.getPartnCols(),
       dpCtx,
       dest_path,
-      isMmTable);
+      mmWriteId);
 
     fileSinkDesc.setHiveServerQuery(SessionState.get().isHiveServerQuery());
     // If this is an insert, update, or delete on an ACID table then mark that so the

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index fb5ca57..723719d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -206,6 +206,7 @@ public abstract class TaskCompiler {
       }
     } else if (!isCStats) {
       for (LoadTableDesc ltd : loadTableWork) {
+        // TODO# HERE
         Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
         mvTask.add(tsk);
         // Check to see if we are stale'ing any indexes and auto-update them if we want

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index f51999d..63cc0cc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -96,8 +96,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   private transient Table table;
   private Path destPath;
   private boolean isHiveServerQuery;
-  private boolean isMmTable;
-  private String executionPrefix;
+  private Long mmWriteId;
 
   public FileSinkDesc() {
   }
@@ -109,7 +108,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
       final boolean compressed, final int destTableId, final boolean multiFileSpray,
       final boolean canBeMerged, final int numFiles, final int totalFiles,
       final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath,
-      boolean isMmTable) {
+      Long mmWriteId) {
 
     this.dirName = dirName;
     this.tableInfo = tableInfo;
@@ -123,7 +122,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     this.dpCtx = dpCtx;
     this.dpSortState = DPSortState.NONE;
     this.destPath = destPath;
-    this.isMmTable = isMmTable;
+    this.mmWriteId = mmWriteId;
   }
 
   public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
@@ -145,7 +144,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   public Object clone() throws CloneNotSupportedException {
     FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed,
         destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles,
-        partitionCols, dpCtx, destPath, isMmTable);
+        partitionCols, dpCtx, destPath, mmWriteId);
     ret.setCompressCodec(compressCodec);
     ret.setCompressType(compressType);
     ret.setGatherStats(gatherStats);
@@ -159,7 +158,6 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     ret.setWriteType(writeType);
     ret.setTransactionId(txnId);
     ret.setStatsTmpDir(statsTmpDir);
-    ret.setExecutionPrefix(executionPrefix);
     return ret;
   }
 
@@ -254,7 +252,11 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   }
 
   public boolean isMmTable() {
-    return isMmTable;
+    return mmWriteId != null;
+  }
+
+  public long getMmWriteId() {
+    return mmWriteId;
   }
 
   public boolean isMaterialization() {
@@ -482,12 +484,4 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   public void setStatsTmpDir(String statsCollectionTempDir) {
     this.statsTmpDir = statsCollectionTempDir;
   }
-
-  public String getExecutionPrefix() {
-    return this.executionPrefix;
-  }
-
-  public void setExecutionPrefix(String value) {
-    this.executionPrefix = value;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index 3b49197..fc8726c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -42,7 +42,7 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
   // Need to remember whether this is an acid compliant operation, and if so whether it is an
   // insert, update, or delete.
   private AcidUtils.Operation writeType;
-  private boolean isMmTable;
+  private Long mmWriteId;
 
   // TODO: the below seems like they should just be combined into partitionDesc
   private org.apache.hadoop.hive.ql.plan.TableDesc table;
@@ -52,10 +52,10 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final Map<String, String> partitionSpec,
       final boolean replace,
-      final AcidUtils.Operation writeType, boolean isMmTable) {
+      final AcidUtils.Operation writeType, Long mmWriteId) {
     super(sourcePath);
     Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/);
-    init(table, partitionSpec, replace, writeType, isMmTable);
+    init(table, partitionSpec, replace, writeType, mmWriteId);
   }
 
   /**
@@ -70,15 +70,14 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
                        final Map<String, String> partitionSpec,
                        final boolean replace) {
     // TODO# we assume mm=false here
-    this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID, false);
+    this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID, null);
   }
 
   public LoadTableDesc(final Path sourcePath,
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final Map<String, String> partitionSpec,
-      final AcidUtils.Operation writeType, boolean isMmTable) {
-    // TODO# we assume mm=false here
-    this(sourcePath, table, partitionSpec, true, writeType, isMmTable);
+      final AcidUtils.Operation writeType, Long mmWriteId) {
+    this(sourcePath, table, partitionSpec, true, writeType, mmWriteId);
   }
 
   /**
@@ -91,22 +90,21 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
                        final org.apache.hadoop.hive.ql.plan.TableDesc table,
                        final Map<String, String> partitionSpec) {
     // TODO# we assume mm=false here
-    this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, false);
+    this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, null);
   }
 
   public LoadTableDesc(final Path sourcePath,
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final DynamicPartitionCtx dpCtx,
       final AcidUtils.Operation writeType,
-      boolean isReplace,
-      boolean isMmTable) {
+      boolean isReplace, Long mmWriteId) {
     super(sourcePath);
     Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/);
     this.dpCtx = dpCtx;
     if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) {
-      init(table, dpCtx.getPartSpec(), isReplace, writeType, isMmTable);
+      init(table, dpCtx.getPartSpec(), isReplace, writeType, mmWriteId);
     } else {
-      init(table, new LinkedHashMap<String, String>(), isReplace, writeType, isMmTable);
+      init(table, new LinkedHashMap<String, String>(), isReplace, writeType, mmWriteId);
     }
   }
 
@@ -114,12 +112,12 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final Map<String, String> partitionSpec,
       final boolean replace,
-      AcidUtils.Operation writeType, boolean isMmTable) {
+      AcidUtils.Operation writeType, Long mmWriteId) {
     this.table = table;
     this.partitionSpec = partitionSpec;
     this.replace = replace;
     this.writeType = writeType;
-    this.isMmTable = isMmTable;
+    this.mmWriteId = mmWriteId;
   }
 
   @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
@@ -147,7 +145,7 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
 
   @Explain(displayName = "micromanaged table")
   public boolean isMmTable() {
-    return isMmTable;
+    return mmWriteId != null;
   }
 
   public void setReplace(boolean replace) {
@@ -187,4 +185,8 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
   public AcidUtils.Operation getWriteType() {
     return writeType;
   }
+
+  public Long getMmWriteId() {
+    return mmWriteId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index 227b0d2..9f498c7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
  */
 @Explain(displayName = "Move Operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
 public class MoveWork implements Serializable {
-  // TODO# all the places where MoveWork is created need to be handled.
   private static final long serialVersionUID = 1L;
   private LoadTableDesc loadTableWork;
   private LoadFileDesc loadFileWork;

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
index d3c3611..066d2b6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
@@ -146,7 +146,7 @@ public class TestExecDriver extends TestCase {
         db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true);
         db.createTable(src, cols, null, TextInputFormat.class,
             HiveIgnoreKeyTextOutputFormat.class);
-        db.loadTable(hadoopDataFile[i], src, false, true, false, false, false);
+        db.loadTable(hadoopDataFile[i], src, false, true, false, false, false, null);
         i++;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index 1c27873..909114c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -285,7 +285,7 @@ public class TestFileSinkOperator {
       partColMap.put(PARTCOL_NAME, null);
       DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100);
       //todo: does this need the finalDestination?
-      desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null, false);
+      desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null, null);
     } else {
       desc = new FileSinkDesc(basePath, tableDesc, false);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/test/queries/clientpositive/mm_current.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_current.q b/ql/src/test/queries/clientpositive/mm_current.q
index 11259cb..8d19df6 100644
--- a/ql/src/test/queries/clientpositive/mm_current.q
+++ b/ql/src/test/queries/clientpositive/mm_current.q
@@ -4,18 +4,30 @@ set hive.exec.dynamic.partition.mode=nonstrict;
 set hive.fetch.task.conversion=none;
 set tez.grouping.min-size=1;
 set tez.grouping.max-size=2;
-set  hive.tez.auto.reducer.parallelism=false;
+set hive.tez.auto.reducer.parallelism=false;
+
+drop table part_mm;
+drop table simple_mm;
+drop table intermediate;
 
 create table intermediate(key int) partitioned by (p int) stored as orc;
-insert into table intermediate partition(p='455') select key from src limit 3;
-insert into table intermediate partition(p='456') select key from src limit 3;
-insert into table intermediate partition(p='457') select key from src limit 3;
+insert into table intermediate partition(p='455') select key from src limit 2;
+insert into table intermediate partition(p='456') select key from src limit 2;
   
-create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true');
+create table part_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true');
+
+explain insert into table part_mm partition(key_mm='455') select key from intermediate;
+insert into table part_mm partition(key_mm='455') select key from intermediate;
+insert into table part_mm partition(key_mm='456') select key from intermediate;
+insert into table part_mm partition(key_mm='455') select key from intermediate;
+select * from part_mm;
 
-explain insert into table simple_mm partition(key_mm='455') select key from intermediate;
-insert into table simple_mm partition(key_mm='455') select key from intermediate;
+create table simple_mm(key int) stored as orc tblproperties ('hivecommit'='true');
+insert into table simple_mm select key from intermediate;
+insert into table simple_mm select key from intermediate;
+select * from simple_mm;
 
+drop table part_mm;
 drop table simple_mm;
 drop table intermediate;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/test/results/clientpositive/llap/mm_current.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_current.q.out b/ql/src/test/results/clientpositive/llap/mm_current.q.out
index 8f1af4c..f357020 100644
--- a/ql/src/test/results/clientpositive/llap/mm_current.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_current.q.out
@@ -1,3 +1,15 @@
+PREHOOK: query: drop table part_mm
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table part_mm
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table simple_mm
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table simple_mm
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table intermediate
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table intermediate
+POSTHOOK: type: DROPTABLE
 PREHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
@@ -6,44 +18,35 @@ POSTHOOK: query: create table intermediate(key int) partitioned by (p int) store
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@intermediate
-PREHOOK: query: insert into table intermediate partition(p='455') select key from src limit 3
+PREHOOK: query: insert into table intermediate partition(p='455') select key from src limit 2
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src
 PREHOOK: Output: default@intermediate@p=455
-POSTHOOK: query: insert into table intermediate partition(p='455') select key from src limit 3
+POSTHOOK: query: insert into table intermediate partition(p='455') select key from src limit 2
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 POSTHOOK: Output: default@intermediate@p=455
 POSTHOOK: Lineage: intermediate PARTITION(p=455).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
-PREHOOK: query: insert into table intermediate partition(p='456') select key from src limit 3
+PREHOOK: query: insert into table intermediate partition(p='456') select key from src limit 2
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src
 PREHOOK: Output: default@intermediate@p=456
-POSTHOOK: query: insert into table intermediate partition(p='456') select key from src limit 3
+POSTHOOK: query: insert into table intermediate partition(p='456') select key from src limit 2
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 POSTHOOK: Output: default@intermediate@p=456
 POSTHOOK: Lineage: intermediate PARTITION(p=456).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
-PREHOOK: query: insert into table intermediate partition(p='457') select key from src limit 3
-PREHOOK: type: QUERY
-PREHOOK: Input: default@src
-PREHOOK: Output: default@intermediate@p=457
-POSTHOOK: query: insert into table intermediate partition(p='457') select key from src limit 3
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
-POSTHOOK: Output: default@intermediate@p=457
-POSTHOOK: Lineage: intermediate PARTITION(p=457).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
-PREHOOK: query: create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
+PREHOOK: query: create table part_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
-PREHOOK: Output: default@simple_mm
-POSTHOOK: query: create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
+PREHOOK: Output: default@part_mm
+POSTHOOK: query: create table part_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
-POSTHOOK: Output: default@simple_mm
-PREHOOK: query: explain insert into table simple_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: Output: default@part_mm
+PREHOOK: query: explain insert into table part_mm partition(key_mm='455') select key from intermediate
 PREHOOK: type: QUERY
-POSTHOOK: query: explain insert into table simple_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: query: explain insert into table part_mm partition(key_mm='455') select key from intermediate
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
@@ -60,19 +63,19 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: intermediate
-                  Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 4 Data size: 48 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: key (type: int)
                     outputColumnNames: _col0
-                    Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 4 Data size: 48 Basic stats: COMPLETE Column stats: NONE
                     File Output Operator
                       compressed: false
-                      Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
+                      Statistics: Num rows: 4 Data size: 48 Basic stats: COMPLETE Column stats: NONE
                       table:
                           input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
                           output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
                           serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
-                          name: default.simple_mm
+                          name: default.part_mm
             Execution mode: llap
             LLAP IO: all inputs
 
@@ -89,27 +92,133 @@ STAGE PLANS:
               input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
               output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
               serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
-              name: default.simple_mm
+              name: default.part_mm
           micromanaged table: true
 
   Stage: Stage-3
     Stats-Aggr Operator
 
-PREHOOK: query: insert into table simple_mm partition(key_mm='455') select key from intermediate
+PREHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@part_mm@key_mm=455
+POSTHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@part_mm@key_mm=455
+POSTHOOK: Lineage: part_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: insert into table part_mm partition(key_mm='456') select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@part_mm@key_mm=456
+POSTHOOK: query: insert into table part_mm partition(key_mm='456') select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@part_mm@key_mm=456
+POSTHOOK: Lineage: part_mm PARTITION(key_mm=456).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@part_mm@key_mm=455
+POSTHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@part_mm@key_mm=455
+POSTHOOK: Lineage: part_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from part_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@part_mm
+PREHOOK: Input: default@part_mm@key_mm=455
+PREHOOK: Input: default@part_mm@key_mm=456
+#### A masked pattern was here ####
+POSTHOOK: query: select * from part_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@part_mm
+POSTHOOK: Input: default@part_mm@key_mm=455
+POSTHOOK: Input: default@part_mm@key_mm=456
+#### A masked pattern was here ####
+0	455
+455	455
+0	455
+455	455
+0	455
+455	455
+0	455
+455	455
+0	456
+455	456
+0	456
+455	456
+PREHOOK: query: create table simple_mm(key int) stored as orc tblproperties ('hivecommit'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: create table simple_mm(key int) stored as orc tblproperties ('hivecommit'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@simple_mm
+PREHOOK: query: insert into table simple_mm select key from intermediate
 PREHOOK: type: QUERY
 PREHOOK: Input: default@intermediate
 PREHOOK: Input: default@intermediate@p=455
 PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@simple_mm@key_mm=455
-POSTHOOK: query: insert into table simple_mm partition(key_mm='455') select key from intermediate
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: insert into table simple_mm select key from intermediate
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@intermediate
 POSTHOOK: Input: default@intermediate@p=455
 POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@simple_mm@key_mm=455
-POSTHOOK: Lineage: simple_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Output: default@simple_mm
+POSTHOOK: Lineage: simple_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: insert into table simple_mm select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: insert into table simple_mm select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@simple_mm
+POSTHOOK: Lineage: simple_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from simple_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@simple_mm
+#### A masked pattern was here ####
+POSTHOOK: query: select * from simple_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@simple_mm
+#### A masked pattern was here ####
+0
+455
+0
+455
+0
+455
+0
+455
+PREHOOK: query: drop table part_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@part_mm
+PREHOOK: Output: default@part_mm
+POSTHOOK: query: drop table part_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@part_mm
+POSTHOOK: Output: default@part_mm
 PREHOOK: query: drop table simple_mm
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@simple_mm