You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/10/28 02:08:49 UTC

hive git commit: HIVE-15027 : make sure export takes MM information into account (Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/hive-14535 e083d33ac -> 2e602596f


HIVE-15027 : make sure export takes MM information into account (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2e602596
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2e602596
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2e602596

Branch: refs/heads/hive-14535
Commit: 2e602596f7af6c302fd23628d4337673ca38be86
Parents: e083d33
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Oct 27 19:08:33 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Oct 27 19:08:33 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/metastore/ObjectStore.java      |  1 -
 .../apache/hadoop/hive/ql/exec/CopyTask.java    | 15 +++--
 .../apache/hadoop/hive/ql/exec/Utilities.java   | 30 +++++++++
 .../hive/ql/io/CombineHiveInputFormat.java      | 29 ++++++---
 .../hive/ql/parse/ExportSemanticAnalyzer.java   | 66 +++++++++++++++++---
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |  4 +-
 .../ql/plan/ConditionalResolverMergeFiles.java  |  1 +
 .../apache/hadoop/hive/ql/plan/CopyWork.java    | 53 ++++++++++++----
 .../org/apache/hadoop/hive/ql/plan/MapWork.java | 10 +++
 9 files changed, 171 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index a1b3a09..8ad7059 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -636,7 +636,6 @@ public class ObjectStore implements RawStore, Configurable {
 
     transactionStatus = TXN_STATUS.COMMITED;
     try {
-      LOG.error("TODO# grrrrr");
       currentTransaction.commit();
     } catch (Exception ex) {
       Throwable candidate = ex;

http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
index a8a44bc..9f89ea5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
@@ -53,19 +53,24 @@ public class CopyTask extends Task<CopyWork> implements Serializable {
 
   @Override
   public int execute(DriverContext driverContext) {
+    Path[] from = work.getFromPaths(), to = work.getToPaths();
+    for (int i = 0; i < from.length; ++i) {
+      int result = copyOnePath(from[i], to[i]);
+      if (result != 0) return result;
+    }
+    return 0;
+  }
+
+  protected int copyOnePath(Path fromPath, Path toPath) {
     FileSystem dstFs = null;
-    Path toPath = null;
     try {
-      Path fromPath = work.getFromPath();
-      toPath = work.getToPath();
-
       console.printInfo("Copying data from " + fromPath.toString(), " to "
           + toPath.toString());
 
       FileSystem srcFs = fromPath.getFileSystem(conf);
       dstFs = toPath.getFileSystem(conf);
 
-      FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.isSourceMm());
+      FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.doSkipSourceMmDirs());
       if (srcs == null || srcs.length == 0) {
         if (work.isErrorOnSrcEmpty()) {
           console.printError("No files matching path: " + fromPath.toString());

http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 6774d4d..8e506aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -4077,4 +4077,34 @@ public final class Utilities {
     }
   }
 
+  /**
+   * @return the complete list of valid MM directories under a table/partition path; null
+   * if the entire directory is valid (has no uncommitted/temporary files).
+   */
+  public static List<Path> getValidMmDirectoriesFromTableOrPart(Path path, Configuration conf,
+      ValidWriteIds ids, int lbLevels) throws IOException {
+    Utilities.LOG14535.info("Looking for valid MM paths under " + path);
+    // NULL means this directory is entirely valid.
+    List<Path> result = null;
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus[] children = (lbLevels == 0) ? fs.listStatus(path)
+        : fs.globStatus(new Path(path, StringUtils.repeat("*" + Path.SEPARATOR, lbLevels) + "*"));
+    for (int i = 0; i < children.length; ++i) {
+      FileStatus file = children[i];
+      Path childPath = file.getPath();
+      Long writeId = ValidWriteIds.extractWriteId(childPath);
+      if (!file.isDirectory() || writeId == null || !ids.isValid(writeId)) {
+        Utilities.LOG14535.info("Skipping path " + childPath);
+        if (result == null) {
+          result = new ArrayList<>(children.length - 1);
+          for (int j = 0; j < i; ++j) {
+            result.add(children[j].getPath());
+          }
+        }
+      } else if (result != null) {
+        result.add(childPath);
+      }
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index cc1de11..15d6b9b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -85,20 +87,22 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
     private final int start;
     private final int length;
     private final JobConf conf;
+    private final boolean isMerge;
 
-    public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf) {
+    public CheckNonCombinablePathCallable(
+        Path[] paths, int start, int length, JobConf conf, boolean isMerge) {
       this.paths = paths;
       this.start = start;
       this.length = length;
       this.conf = conf;
+      this.isMerge = isMerge;
     }
 
     @Override
     public Set<Integer> call() throws Exception {
       Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
       for (int i = 0; i < length; i++) {
-        PartitionDesc part =
-            HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+        PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
                 pathToPartitionInfo, paths[i + start],
                 IOPrepareCache.get().allocatePartitionDescMap());
         // Use HiveInputFormat if any of the paths is not splittable
@@ -107,12 +111,16 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
             getInputFormatFromCache(inputFormatClass, conf);
         boolean isAvoidSplitCombine = inputFormat instanceof AvoidSplitCombination &&
             ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf);
-        boolean isMmTable = MetaStoreUtils.isInsertOnlyTable(part.getTableDesc().getProperties());
-        if (isAvoidSplitCombine || isMmTable) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("The path [" + paths[i + start] +
+
+        // Combined splits are not supported for MM tables right now.
+        // However, the merge for MM always combines one directory and should ignore that it's MM.
+        boolean isMmTableNonMerge = !isMerge
+            && MetaStoreUtils.isInsertOnlyTable(part.getTableDesc().getProperties());
+        if (isAvoidSplitCombine || isMmTableNonMerge) {
+          //if (LOG.isDebugEnabled()) {
+            Utilities.LOG14535.info("The path [" + paths[i + start] +
                 "] is being parked for HiveInputFormat.getSplits");
-          }
+          //}
           nonCombinablePathIndices.add(i + start);
         }
       }
@@ -467,11 +475,12 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
     ExecutorService executor = Executors.newFixedThreadPool(numThreads);
     List<Future<Set<Integer>>> futureList = new ArrayList<Future<Set<Integer>>>(numThreads);
     try {
+      boolean isMerge = mrwork.isMergeFromResolver();
       for (int i = 0; i < numThreads; i++) {
         int start = i * numPathPerThread;
         int length = i != numThreads - 1 ? numPathPerThread : paths.length - start;
-        futureList.add(executor.submit(
-            new CheckNonCombinablePathCallable(paths, start, length, job)));
+        futureList.add(executor.submit(new CheckNonCombinablePathCallable(
+            paths, start, length, job, isMerge)));
       }
       Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
       for (Future<Set<Integer>> future : futureList) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index 475f2c9..12dea9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -18,6 +18,16 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import org.apache.hadoop.hive.common.ValidWriteIds;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
@@ -171,29 +181,69 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
               .getMsg("Exception while writing out the local file"), e);
     }
 
-    if (!(replicationSpec.isMetadataOnly() || (ts == null))) {
+    if (replicationSpec.isMetadataOnly() || (ts == null)) return;
+
+    try {
       Path parentPath = new Path(toURI);
+      boolean isMmTable = MetaStoreUtils.isInsertOnlyTable(ts.tableHandle.getParameters());
+      Utilities.LOG14535.info("Exporting table " + ts.tableName + " / "
+          + ts.tableHandle.getTableName() + ": " + isMmTable);
+
+      int lbLevels = isMmTable && ts.tableHandle.isStoredAsSubDirectories()
+          ? ts.tableHandle.getSkewedColNames().size() : 0;
+      ValidWriteIds ids = isMmTable ? db.getValidWriteIdsForTable(
+          ts.tableHandle.getDbName(), ts.tableHandle.getTableName()) : null;
       if (ts.tableHandle.isPartitioned()) {
         for (Partition partition : partitions) {
           Path fromPath = partition.getDataLocation();
           Path toPartPath = new Path(parentPath, partition.getName());
-          Task<? extends Serializable> rTask = TaskFactory.get(
-              new CopyWork(fromPath, toPartPath, false),
-              conf);
-          rootTasks.add(rTask);
+          CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toPartPath);
+          rootTasks.add(TaskFactory.get(cw, conf));
           inputs.add(new ReadEntity(partition));
         }
       } else {
         Path fromPath = ts.tableHandle.getDataLocation();
         Path toDataPath = new Path(parentPath, "data");
-        Task<? extends Serializable> rTask = TaskFactory.get(new CopyWork(
-            fromPath, toDataPath, false), conf);
-        rootTasks.add(rTask);
+        CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toDataPath);
+        rootTasks.add(TaskFactory.get(cw, conf));
         inputs.add(new ReadEntity(ts.tableHandle));
       }
       outputs.add(toWriteEntity(parentPath));
+    } catch (HiveException | IOException ex) {
+      throw new SemanticException(ex);
     }
+  }
 
+  private CopyWork createCopyWork(boolean isMmTable, int lbLevels, ValidWriteIds ids,
+      Path fromPath, Path toDataPath) throws IOException {
+    List<Path> validPaths = null;
+    if (isMmTable) {
+      fromPath = fromPath.getFileSystem(conf).makeQualified(fromPath);
+      validPaths = Utilities.getValidMmDirectoriesFromTableOrPart(fromPath, conf, ids, lbLevels);
+    }
+    if (validPaths == null) {
+      return new CopyWork(fromPath, toDataPath, false); // Not MM, or no need to skip anything.
+    } else {
+      return createCopyWorkForValidPaths(fromPath, toDataPath, validPaths);
+    }
   }
 
+  private CopyWork createCopyWorkForValidPaths(
+      Path fromPath, Path toPartPath, List<Path> validPaths) {
+    Path[] from = new Path[validPaths.size()], to = new Path[validPaths.size()];
+    int i = 0;
+    String fromPathStr = fromPath.toString();
+    if (!fromPathStr.endsWith(Path.SEPARATOR)) {
+      fromPathStr += "/";
+    }
+    for (Path validPath : validPaths) {
+      from[i] = validPath;
+      // TODO: assumes the results are already qualified.
+      to[i] = new Path(toPartPath, validPath.toString().substring(fromPathStr.length()));
+      Utilities.LOG14535.info("Will copy " + from[i] + " to " + to[i]
+          + " based on dest " + toPartPath + ", from " + fromPathStr + ", subpath " + validPath);
+      ++i;
+    }
+    return new CopyWork(from, to);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 87b85c8..8aa076f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -347,7 +347,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         + mmWriteId + " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName()));
 
     CopyWork cv = new CopyWork(dataPath, destPath, false);
-    cv.setIsSourceMm(isSourceMm);
+    cv.setSkipSourceMmDirs(isSourceMm);
     LoadTableDesc loadTableWork = new LoadTableDesc(destPath,
         Utilities.getTableDesc(table), new TreeMap<String, String>(), replace, mmWriteId);
     MoveWork mv = new MoveWork(getInputs(), getOutputs(), loadTableWork, null, false);
@@ -411,7 +411,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm "
           + mmWriteId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec()));
       CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false);
-      cw.setIsSourceMm(isSourceMm);
+      cw.setSkipSourceMmDirs(isSourceMm);
       DDLWork dw = new DDLWork(getInputs(), getOutputs(), addPartitionDesc);
       LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table),
           partSpec.getPartSpec(), true, mmWriteId);

http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
index 4635f18..7fcb1ff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
@@ -330,6 +330,7 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver,
     mWork.setMinSplitSize(targetSize);
     mWork.setMinSplitSizePerNode(targetSize);
     mWork.setMinSplitSizePerRack(targetSize);
+    mWork.setIsMergeFromResolver(true);
   }
 
   private static class AverageSize {

http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
index 2e484ba..c08911f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
@@ -30,10 +30,10 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
 @Explain(displayName = "Copy", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
 public class CopyWork implements Serializable {
   private static final long serialVersionUID = 1L;
-  private Path fromPath;
-  private Path toPath;
+  private Path[] fromPath;
+  private Path[] toPath;
   private boolean errorOnSrcEmpty;
-  private boolean isMm = false;
+  private boolean isSkipMmDirs = false;
 
   public CopyWork() {
   }
@@ -43,18 +43,45 @@ public class CopyWork implements Serializable {
   }
 
   public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) {
+    this(new Path[] { fromPath }, new Path[] { toPath });
+    this.setErrorOnSrcEmpty(errorOnSrcEmpty);
+  }
+
+  public CopyWork(final Path[] fromPath, final Path[] toPath) {
+    if (fromPath.length != toPath.length) {
+      throw new RuntimeException(
+          "Cannot copy " + fromPath.length + " paths into " + toPath.length + " paths");
+    }
     this.fromPath = fromPath;
     this.toPath = toPath;
-    this.setErrorOnSrcEmpty(errorOnSrcEmpty);
   }
-  
+
+  // Keep backward compat in explain for single-file copy tasks.
   @Explain(displayName = "source", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-  public Path getFromPath() {
-    return fromPath;
+  public Path getFromPathExplain() {
+    return (fromPath == null || fromPath.length > 1) ? null : fromPath[0];
   }
 
   @Explain(displayName = "destination", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-  public Path getToPath() {
+  public Path getToPathExplain() {
+    return (toPath == null || toPath.length > 1) ? null : toPath[0];
+  }
+
+  @Explain(displayName = "sources", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+  public Path[] getFromPathsExplain() {
+    return (fromPath != null && fromPath.length > 1) ? fromPath : null;
+  }
+
+  @Explain(displayName = "destinations", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+  public Path[] getToPathsExplain() {
+    return (toPath != null && toPath.length > 1) ? toPath : null;
+  }
+
+  public Path[] getFromPaths() {
+    return fromPath;
+  }
+
+  public Path[] getToPaths() {
     return toPath;
   }
 
@@ -66,12 +93,14 @@ public class CopyWork implements Serializable {
     return errorOnSrcEmpty;
   }
 
-  public void setIsSourceMm(boolean isMm) {
-    this.isMm = isMm;
+  /** Whether the copy should ignore MM directories in the source, and copy their content to
+   * destination directly, rather than copying the directories themselves. */
+  public void setSkipSourceMmDirs(boolean isMm) {
+    this.isSkipMmDirs = isMm;
   }
 
-  public boolean isSourceMm() {
-    return isMm ;
+  public boolean doSkipSourceMmDirs() {
+    return isSkipMmDirs ;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 1be4d84..5a81a62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -148,6 +148,8 @@ public class MapWork extends BaseWork {
   /** Whether LLAP IO will be used for inputs. */
   private String llapIoDesc;
 
+  private boolean isMergeFromResolver;
+
   public MapWork() {}
 
   public MapWork(String name) {
@@ -718,4 +720,12 @@ public class MapWork extends BaseWork {
   public VectorizedRowBatch getVectorizedRowBatch() {
     return vectorizedRowBatch;
   }
+
+  public void setIsMergeFromResolver(boolean b) {
+    this.isMergeFromResolver = b;
+  }
+
+  public boolean isMergeFromResolver() {
+    return this.isMergeFromResolver;
+  }
 }